添加AI图像生成接口、修改相关界面

This commit is contained in:
xinzhu.yin
2026-04-29 15:25:58 +08:00
parent e243fc4e94
commit 377bba2a0b
4 changed files with 443 additions and 134 deletions

View File

@@ -1,6 +1,10 @@
"""AI 图片生成服务:后端请求 + 本地缓存管理。
API 端点待接入,当前通过 ``set_api_caller`` 注入具体实现。
后端接口(测试环境):
POST {API_BASE_URL}{API_PATH}
body: {"user_message": str, "session_id": str}
resp: {"code": 200, "message": "", "data": {"imageUrl": "..."}}
缓存目录:``settings/ai_image_cache/``,每张图片有同名的 ``.json`` 侧车记录。
"""
@@ -9,15 +13,24 @@ from __future__ import annotations
import datetime as _dt
import hashlib
import json
import logging
import mimetypes
import os
import shutil
import threading
import time
import uuid
from io import BytesIO
from dataclasses import dataclass, asdict
from typing import Callable, List, Optional
from urllib.parse import urlparse
from urllib.request import Request, urlopen
from PIL import Image
logger = logging.getLogger(__name__)
# ---------- 常量 ----------
@@ -25,41 +38,166 @@ _CACHE_DIRNAME = os.path.join("settings", "ai_image_cache")
_META_SUFFIX = ".json"
_SUPPORTED_IMG_EXT = (".png", ".jpg", ".jpeg", ".bmp", ".webp")
# 测试环境后端
API_BASE_URL = "http://10.201.44.70:9018/ai-agent/"
API_PATH = "api/v1/pqtest/generate"
API_TIMEOUT = 90.0 # 后端最长 60s留余量
# 进程级会话 id多轮对话需保持一致可通过 ``reset_session`` 重置
_session_id: str = str(uuid.uuid4())
_session_lock = threading.Lock()
def get_session_id() -> str:
with _session_lock:
return _session_id
def set_session_id(session_id: str) -> str:
"""切换到指定会话。空值会抛错。"""
global _session_id
sid = (session_id or "").strip()
if not sid:
raise ValueError("session_id 不能为空")
with _session_lock:
old = _session_id
_session_id = sid
logger.info("[AIImage] 会话切换 %s -> %s", _mask_sid(old), _mask_sid(_session_id))
return _session_id
def reset_session() -> str:
"""开启新一轮会话,返回新的 session_id。"""
global _session_id
with _session_lock:
old = _session_id
_session_id = str(uuid.uuid4())
logger.info("[AIImage] 会话切换 %s -> %s", _mask_sid(old), _mask_sid(_session_id))
return _session_id
def _mask_sid(sid: str) -> str:
"""日志安全展示:仅保留前 8 位。"""
if not sid:
return "(none)"
return f"{sid[:8]}"
def _truncate(text: str, n: int = 80) -> str:
s = (text or "").replace("\n", " ").strip()
return s if len(s) <= n else s[:n] + ""
# ---------- 数据结构 ----------
@dataclass
class AIImageRecord:
"""一条缓存记录。"""
"""一条缓存记录。
字段说明:
- ``id``: 唯一 id等同于磁盘文件名不含扩展名格式 ``{时间戳}_{md5前8}``。
- ``prompt``: 用户原始输入(完整保留,用于回溯/调试,不应被改写)。
- ``title``: 用户自定义展示标题重命名时写入UI 优先使用,留空则回退 prompt 第一行截断。
- ``image_path``: 图片在缓存目录中的绝对路径。
- ``created_at``: ISO8601 时间字符串。
- ``extra``: 其它元数据,至少包含 ``source`` 与 ``session_id``(标识属于哪一轮对话)。
"""
id: str
prompt: str
image_path: str
created_at: str # ISO8601
extra: Optional[dict] = None
title: Optional[str] = None
def to_json(self) -> str:
return json.dumps(asdict(self), ensure_ascii=False, indent=2)
@property
def display_name(self) -> str:
"""UI 展示名title 优先,否则回退 prompt 第一行。"""
if self.title:
return self.title.strip()
first = (self.prompt or "").strip().splitlines()[0] if self.prompt else ""
return first or "(未命名)"
# ---------- API 注入 ----------
# 调用签名: ``fn(prompt: str) -> (image_bytes: bytes, image_ext: str, extra: dict|None)``
# ``image_ext`` 例如 ``".png"````extra`` 可为 None。
_ApiCaller = Callable[[str], tuple]
_api_caller: Optional[_ApiCaller] = None
@property
def session_id(self) -> str:
if isinstance(self.extra, dict):
return str(self.extra.get("session_id") or "")
return ""
def set_api_caller(fn: Optional[_ApiCaller]) -> None:
"""注入真实的后端 API 调用函数。在 API 就绪前可保持为 None。"""
global _api_caller
_api_caller = fn
# ---------- 后端 API ----------
def has_api() -> bool:
return _api_caller is not None
def _api_endpoint() -> str:
base = API_BASE_URL if API_BASE_URL.endswith("/") else API_BASE_URL + "/"
return base + API_PATH.lstrip("/")
def _call_pqtest_generate(user_message: str, session_id: str, timeout: float = API_TIMEOUT) -> str:
"""调用后端 ``api/v1/pqtest/generate``,返回 imageUrl。失败抛异常。"""
payload = json.dumps(
{"user_message": user_message,
"session_id": session_id},
ensure_ascii=False,
).encode("utf-8")
endpoint = _api_endpoint()
logger.info(
"[AIImage] 请求生成 sid=%s prompt_len=%d prompt=%r",
_mask_sid(session_id), len(user_message or ""), _truncate(user_message),
)
logger.debug("[AIImage] POST %s timeout=%.1fs", endpoint, timeout)
request = Request(
endpoint,
data=payload,
method="POST",
headers={
"Content-Type": "application/json; charset=utf-8",
"Accept": "application/json",
"User-Agent": "pqAutomationApp/1.0",
},
)
t0 = time.monotonic()
try:
with urlopen(request, timeout=timeout) as response:
raw = response.read()
http_status = response.status
except Exception as exc:
elapsed = time.monotonic() - t0
logger.error(
"[AIImage] 请求异常 sid=%s elapsed=%.2fs %s: %s",
_mask_sid(session_id), elapsed, type(exc).__name__, exc,
)
raise
elapsed = time.monotonic() - t0
logger.debug(
"[AIImage] HTTP %s 收到 %d bytes elapsed=%.2fs",
http_status, len(raw), elapsed,
)
try:
result = json.loads(raw.decode("utf-8"))
except Exception as exc:
logger.error("[AIImage] 响应解析失败 sid=%s raw=%r", _mask_sid(session_id), raw[:200])
raise RuntimeError(f"AI 接口返回非 JSON{raw[:200]!r}") from exc
code = result.get("code")
message = result.get("message") or ""
data = result.get("data") or {}
image_url = (data.get("imageUrl") or "").strip()
if code != 200 or not image_url:
logger.warning(
"[AIImage] 接口失败 sid=%s code=%s msg=%r",
_mask_sid(session_id), code, message,
)
raise RuntimeError(f"AI 接口失败 code={code} msg={message or '生成失败'}")
logger.info(
"[AIImage] 生成成功 sid=%s elapsed=%.2fs url=%s",
_mask_sid(session_id), elapsed, image_url,
)
return image_url
# ---------- 缓存路径工具 ----------
@@ -83,6 +221,28 @@ def _meta_path_for(image_path: str) -> str:
return os.path.splitext(image_path)[0] + _META_SUFFIX
def _sanitize_image_bytes(image_bytes: bytes, image_ext: str) -> bytes:
"""规范化图片字节,尽量去掉已知有问题的 PNG ICC profile。"""
ext = (image_ext or "").lower()
if ext not in {".png", ".jpg", ".jpeg", ".bmp", ".webp"}:
return image_bytes
try:
with Image.open(BytesIO(image_bytes)) as img:
img.load()
normalized = img.copy()
output = BytesIO()
save_kwargs = {}
if ext == ".png":
save_kwargs["icc_profile"] = None
normalized.save(output, format=normalized.format or ext.lstrip(".").upper(), **save_kwargs)
result = output.getvalue()
if result:
return result
except Exception as exc:
logger.warning("[AIImage] 图片规范化失败 ext=%s %s: %s", ext, type(exc).__name__, exc)
return image_bytes
# ---------- 读写 ----------
@@ -98,6 +258,7 @@ def list_records(base_dir: Optional[str] = None) -> List[AIImageRecord]:
prompt = ""
created_at = ""
extra = None
title = None
rec_id = os.path.splitext(name)[0]
if os.path.isfile(meta_path):
try:
@@ -106,6 +267,7 @@ def list_records(base_dir: Optional[str] = None) -> List[AIImageRecord]:
prompt = data.get("prompt", "")
created_at = data.get("created_at", "")
extra = data.get("extra")
title = data.get("title")
rec_id = data.get("id", rec_id)
except Exception:
pass
@@ -123,54 +285,27 @@ def list_records(base_dir: Optional[str] = None) -> List[AIImageRecord]:
image_path=full,
created_at=created_at,
extra=extra,
title=title,
)
)
if not records:
seeded = _seed_placeholder_record(cache_dir)
if seeded is not None:
records.append(seeded)
records.sort(key=lambda r: r.created_at, reverse=True)
return records
def _seed_placeholder_record(cache_dir: str) -> Optional[AIImageRecord]:
"""当缓存为空时,写入一张本地占位图,便于前端联调。"""
try:
repo_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
src = os.path.join(repo_root, "assets", "entry_1.png")
if not os.path.isfile(src):
return None
rec_id = f"{_dt.datetime.now().strftime('%Y%m%d_%H%M%S')}_placeholder"
image_path = os.path.join(cache_dir, f"{rec_id}.png")
shutil.copyfile(src, image_path)
record = AIImageRecord(
id=rec_id,
prompt="本地测试占位图(后端未接入)",
image_path=image_path,
created_at=_dt.datetime.now().isoformat(timespec="seconds"),
extra={"source": "local-placeholder"},
)
with open(_meta_path_for(image_path), "w", encoding="utf-8") as f:
f.write(record.to_json())
return record
except Exception:
return None
def save_image_to_cache(
prompt: str,
image_bytes: bytes,
image_ext: str = ".png",
extra: Optional[dict] = None,
base_dir: Optional[str] = None,
title: Optional[str] = None,
) -> AIImageRecord:
"""把生成的图片字节写入缓存,返回记录。"""
if not image_ext.startswith("."):
image_ext = "." + image_ext
if image_ext.lower() not in _SUPPORTED_IMG_EXT:
image_ext = ".png"
image_bytes = _sanitize_image_bytes(image_bytes, image_ext)
cache_dir = get_cache_dir(base_dir)
rec_id = _make_id(prompt)
image_path = os.path.join(cache_dir, f"{rec_id}{image_ext}")
@@ -183,6 +318,7 @@ def save_image_to_cache(
image_path=image_path,
created_at=_dt.datetime.now().isoformat(timespec="seconds"),
extra=extra,
title=title,
)
try:
with open(_meta_path_for(image_path), "w", encoding="utf-8") as f:
@@ -256,6 +392,53 @@ def export_record(record: AIImageRecord, dest_path: str) -> None:
shutil.copyfile(record.image_path, dest_path)
def update_record_title(record: AIImageRecord, new_title: Optional[str]) -> bool:
"""更新记录的展示标题并写回侧车 JSON。空串/None 视为清除标题。"""
title = (new_title or "").strip() or None
meta_path = _meta_path_for(record.image_path)
try:
data: dict = {}
if os.path.isfile(meta_path):
with open(meta_path, "r", encoding="utf-8") as f:
data = json.load(f) or {}
if title is None:
data.pop("title", None)
else:
data["title"] = title
with open(meta_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception:
return False
record.title = title
return True
def group_records_by_session(records: List[AIImageRecord]) -> List[dict]:
"""按 ``session_id`` 分组。
返回元素:``{"session_id", "records", "started_at", "latest_at"}``。
会话按"最近使用时间"倒序,组内记录按时间倒序。
没有 session_id 的记录归到空串 ``""`` 组。
"""
buckets: dict = {}
for rec in records:
buckets.setdefault(rec.session_id, []).append(rec)
sessions = []
for sid, recs in buckets.items():
recs.sort(key=lambda r: r.created_at, reverse=True)
started_at = min((r.created_at for r in recs if r.created_at), default="")
sessions.append(
{
"session_id": sid,
"records": recs,
"started_at": started_at,
"latest_at": recs[0].created_at if recs else "",
}
)
sessions.sort(key=lambda s: s["latest_at"], reverse=True)
return sessions
# ---------- 异步请求 ----------
@@ -264,27 +447,37 @@ def request_image_async(
on_success: Callable[[AIImageRecord], None],
on_error: Callable[[Exception], None],
base_dir: Optional[str] = None,
session_id: Optional[str] = None,
) -> threading.Thread:
"""在后台线程请求 API → 写入缓存 → 回调。
"""在后台线程调用后端 API → 下载图片 → 写入缓存 → 回调。
``on_success`` / ``on_error`` 会在 **工作线程** 中被调用UI 侧若需
切回主线程,请在回调内部自行用 ``root.after(0, ...)``。
``session_id`` 留空则使用进程级会话 id保证多轮对话上下文
"""
sid = session_id or get_session_id()
def _worker():
try:
if _api_caller is None:
raise RuntimeError("AI 图片 API 尚未接入,请调用 set_api_caller 注入")
image_bytes, image_ext, extra = _normalize_api_result(_api_caller(prompt))
record = save_image_to_cache(
image_url = _call_pqtest_generate(prompt, sid)
record = import_image_from_url(
image_url=image_url,
prompt=prompt,
image_bytes=image_bytes,
image_ext=image_ext,
extra=extra,
extra={"source": "ai-api", "session_id": sid},
base_dir=base_dir,
)
logger.info(
"[AIImage] 已写入缓存 sid=%s id=%s path=%s",
_mask_sid(sid), record.id, record.image_path,
)
on_success(record)
except Exception as exc:
logger.error(
"[AIImage] 生成流程失败 sid=%s %s: %s",
_mask_sid(sid), type(exc).__name__, exc,
)
on_error(exc)
t = threading.Thread(target=_worker, daemon=True)
@@ -321,18 +514,6 @@ def import_image_from_url_async(
return t
def _normalize_api_result(result):
"""允许 API 返回 ``bytes`` 或 ``(bytes, ext)`` 或 ``(bytes, ext, extra)``。"""
if isinstance(result, (bytes, bytearray)):
return bytes(result), ".png", None
if isinstance(result, tuple):
if len(result) == 2:
return bytes(result[0]), str(result[1]), None
if len(result) == 3:
return bytes(result[0]), str(result[1]), result[2]
raise ValueError("API 返回格式不支持,需为 bytes 或 (bytes, ext[, extra])")
def is_remote_image_url(value: str) -> bool:
"""判断输入是否为 http/https 图片地址。"""
url = (value or "").strip()

View File

@@ -28,6 +28,8 @@ def create_ai_image_panel(self):
self.ai_image_current = None # AIImageRecord | None
self.ai_image_photo = None # 防止 ImageTk.PhotoImage 被 GC
self._ai_image_requesting = False
self._ai_image_progress_job = None
self._ai_image_progress_phase = 0
container = ttk.Frame(frame, padding=10)
container.pack(fill=tk.BOTH, expand=True)
@@ -150,11 +152,24 @@ def create_ai_image_panel(self):
send_row, textvariable=self.ai_image_status_var,
foreground="#888", font=("微软雅黑", 9),
).pack(side=tk.LEFT)
self.ai_image_progress = ttk.Progressbar(
send_row,
mode="indeterminate",
length=120,
bootstyle="info-striped",
)
self.ai_image_progress.pack(side=tk.LEFT, padx=(8, 0))
self.ai_image_progress.pack_forget()
self.ai_image_send_btn = ttk.Button(
send_row, text="发送", bootstyle="primary", width=10,
command=lambda: _send_prompt(self),
)
self.ai_image_send_btn.pack(side=tk.RIGHT)
self.ai_image_new_session_btn = ttk.Button(
send_row, text="新对话", bootstyle="secondary-outline", width=10,
command=lambda: _start_new_session(self),
)
self.ai_image_new_session_btn.pack(side=tk.RIGHT, padx=(0, 6))
# 注册面板
self.register_panel("ai_image", frame, None, "ai_image_visible")
@@ -173,17 +188,49 @@ def toggle_ai_image_panel(self):
def reload_ai_image_list(self):
"""重新扫描缓存并刷新列表。"""
"""重新扫描缓存并刷新列表。
按 ``session_id`` 分组:每个会话用一行不可选的分隔头(``── 会话 #N · 时间 ──``
其下列出该轮生成的所有图片。会话按"最近使用"倒序,组内按时间倒序。
"""
self.ai_image_records = _svc.list_records()
self.ai_image_listbox.delete(0, tk.END)
for rec in self.ai_image_records:
label = _format_list_label(rec)
self.ai_image_listbox.insert(tk.END, label)
# 维护行号 → 记录索引的映射;分隔头处为 None
self._ai_image_row_map = []
self._ai_image_row_session_map = []
sessions = _svc.group_records_by_session(self.ai_image_records)
flat = []
current_sid = _svc.get_session_id()
for idx, sess in enumerate(sessions, start=1):
sid = sess["session_id"]
is_current = sid and sid == current_sid
header = _format_session_header(idx, sess, is_current=is_current)
self.ai_image_listbox.insert(tk.END, header)
# 头部行:禁用选中(视觉上变灰)
last = self.ai_image_listbox.size() - 1
self.ai_image_listbox.itemconfig(
last, foreground="#888", selectforeground="#888",
background="#f5f5f5", selectbackground="#f5f5f5",
)
self._ai_image_row_map.append(None)
self._ai_image_row_session_map.append(sid)
for rec in sess["records"]:
label = " " + _format_list_label(rec)
self.ai_image_listbox.insert(tk.END, label)
self._ai_image_row_map.append(len(flat))
self._ai_image_row_session_map.append(rec.session_id)
flat.append(rec)
# 替换为按显示顺序展平后的列表,便于其它逻辑(前一张/后一张等)
self.ai_image_records = flat
if self.ai_image_records:
self.ai_image_listbox.selection_clear(0, tk.END)
self.ai_image_listbox.selection_set(0)
self.ai_image_listbox.activate(0)
_select_record(self, self.ai_image_records[0])
# 选中第一张实际记录
for row, ridx in enumerate(self._ai_image_row_map):
if ridx is not None:
self.ai_image_listbox.selection_clear(0, tk.END)
self.ai_image_listbox.selection_set(row)
self.ai_image_listbox.activate(row)
_select_record(self, self.ai_image_records[ridx])
break
else:
self.ai_image_current = None
self.ai_image_photo = None
@@ -191,6 +238,14 @@ def reload_ai_image_list(self):
self.ai_image_meta_var.set("暂无缓存图片")
def _format_session_header(index: int, sess: dict, is_current: bool) -> str:
started = (sess.get("started_at") or "").replace("T", " ")[:16]
tag = "(当前)" if is_current else ""
if sess.get("session_id"):
return f"── 会话 #{index} · {started} {tag}──"
return f"── 未归类 · {started} ──"
def _format_list_label(rec: _svc.AIImageRecord) -> str:
# 分辨率前缀:优先从 extra["size"] 取,其次从文件名猜
size_tag = ""
@@ -205,21 +260,34 @@ def _format_list_label(rec: _svc.AIImageRecord) -> str:
except Exception:
pass
prompt_line = (rec.prompt or "(无提示)").strip().splitlines()[0]
# 剩余可用宽度width=34)去掉 size_tag
max_prompt = 34 - len(size_tag) - 2
if max_prompt > 4 and len(prompt_line) > max_prompt:
prompt_line = prompt_line[:max_prompt] + ""
return f"{size_tag}{prompt_line}"
name_line = rec.display_name.splitlines()[0] if rec.display_name else "(未命名)"
# 列表宽度 width=34,需要扣除两格缩进 + size_tag
max_name = 34 - 2 - len(size_tag) - 2
if max_name > 4 and len(name_line) > max_name:
name_line = name_line[:max_name] + ""
return f"{size_tag}{name_line}"
def _on_list_select(self):
sel = self.ai_image_listbox.curselection()
if not sel:
return
idx = sel[0]
if 0 <= idx < len(self.ai_image_records):
_select_record(self, self.ai_image_records[idx])
row = sel[0]
row_map = getattr(self, "_ai_image_row_map", None) or []
if row >= len(row_map):
return
ridx = row_map[row]
if ridx is None:
session_id = _session_id_for_row(self, row)
if session_id:
_switch_to_session(self, session_id, show_message=False)
self.ai_image_listbox.selection_clear(row)
return
if 0 <= ridx < len(self.ai_image_records):
rec = self.ai_image_records[ridx]
if rec.session_id:
_switch_to_session(self, rec.session_id, show_message=False, target_record_id=rec.id)
_select_record(self, rec)
def _select_record(self, rec: _svc.AIImageRecord):
@@ -243,6 +311,7 @@ def _redraw_preview(self):
ch = canvas.winfo_height() or 1
try:
img = Image.open(rec.image_path)
img.load()
except Exception as exc:
canvas.create_text(cw // 2, ch // 2, text=f"加载失败: {exc}", fill="#f66")
return
@@ -257,6 +326,58 @@ def _redraw_preview(self):
# ---------------- 发送 / 保存 / 删除 ----------------
def _start_new_session(self):
"""开启新的对话会话,后续生成将使用新的 session_id。"""
if getattr(self, "_ai_image_requesting", False):
messagebox.showinfo("提示", "请等待当前请求完成")
return
_svc.reset_session()
self.ai_image_status_var.set("已开启新对话")
reload_ai_image_list(self)
def _session_id_for_row(self, row: int) -> str:
session_map = getattr(self, "_ai_image_row_session_map", None) or []
if row < 0 or row >= len(session_map):
return ""
return session_map[row] or ""
def _switch_to_session(self, session_id: str, show_message: bool = True, target_record_id: str = ""):
sid = (session_id or "").strip()
if not sid:
return
if sid == _svc.get_session_id():
return
_svc.set_session_id(sid)
reload_ai_image_list(self)
if target_record_id:
for row, ridx in enumerate(getattr(self, "_ai_image_row_map", []) or []):
if ridx is None:
continue
rec = self.ai_image_records[ridx]
if rec.id == target_record_id:
self.ai_image_listbox.selection_clear(0, tk.END)
self.ai_image_listbox.selection_set(row)
self.ai_image_listbox.activate(row)
self.ai_image_listbox.see(row)
_select_record(self, rec)
break
self.ai_image_status_var.set("已切换到历史对话")
if show_message:
messagebox.showinfo("提示", "已切换到所选历史对话")
def _update_request_progress(self):
if not getattr(self, "_ai_image_requesting", False):
self._ai_image_progress_job = None
return
phases = ["后端处理中…", "正在生成图片…", "正在下载结果…", "即将完成…"]
self.ai_image_status_var.set(phases[self._ai_image_progress_phase % len(phases)])
self._ai_image_progress_phase += 1
self._ai_image_progress_job = self.root.after(900, lambda: _update_request_progress(self))
def _send_prompt(self):
if getattr(self, "_ai_image_requesting", False):
return
@@ -267,7 +388,7 @@ def _send_prompt(self):
_set_requesting(self, True)
is_remote_url = _svc.is_remote_image_url(prompt)
self.ai_image_status_var.set("下载中…" if is_remote_url else "请求中…")
self.ai_image_status_var.set("下载中…" if is_remote_url else "后端处理中…")
def _success(record):
self.root.after(0, lambda: _on_request_done(self, record, None))
@@ -283,17 +404,6 @@ def _send_prompt(self):
)
return
if not _svc.has_api():
_set_requesting(self, False)
self.ai_image_status_var.set("就绪")
messagebox.showerror(
"API 未配置",
"AI 图片 API 尚未接入。\n"
"可直接输入图片 URL 导入,或在启动时通过 "
"app.services.ai_image.set_api_caller(...) 注入真实实现。",
)
return
_svc.request_image_async(prompt, on_success=_success, on_error=_error)
@@ -301,8 +411,25 @@ def _set_requesting(self, flag: bool):
self._ai_image_requesting = flag
try:
self.ai_image_send_btn.configure(state=tk.DISABLED if flag else tk.NORMAL)
self.ai_image_new_session_btn.configure(state=tk.DISABLED if flag else tk.NORMAL)
except Exception:
pass
if flag:
self._ai_image_progress_phase = 0
self.ai_image_progress.pack(side=tk.LEFT, padx=(8, 0))
self.ai_image_progress.start(10)
if self._ai_image_progress_job is not None:
self.root.after_cancel(self._ai_image_progress_job)
self._ai_image_progress_job = self.root.after(0, lambda: _update_request_progress(self))
else:
if self._ai_image_progress_job is not None:
self.root.after_cancel(self._ai_image_progress_job)
self._ai_image_progress_job = None
try:
self.ai_image_progress.stop()
self.ai_image_progress.pack_forget()
except Exception:
pass
def _on_request_done(self, record, exc):
@@ -314,13 +441,16 @@ def _on_request_done(self, record, exc):
self.ai_image_status_var.set("完成")
self.ai_image_input.delete("1.0", tk.END)
reload_ai_image_list(self)
# 定位到新生成项(最新在前)
if record is not None and self.ai_image_records:
for i, r in enumerate(self.ai_image_records):
for row, ridx in enumerate(getattr(self, "_ai_image_row_map", []) or []):
if ridx is None:
continue
r = self.ai_image_records[ridx]
if r.id == record.id:
self.ai_image_listbox.selection_clear(0, tk.END)
self.ai_image_listbox.selection_set(i)
self.ai_image_listbox.activate(i)
self.ai_image_listbox.selection_set(row)
self.ai_image_listbox.activate(row)
self.ai_image_listbox.see(row)
_select_record(self, r)
break
@@ -358,53 +488,41 @@ def _delete_current(self):
def _rename_current(self):
"""弹窗让用户修改当前记录的备注名称(即列表显示中 size_tag 之后的部分)。"""
"""弹窗让用户修改当前记录的展示标题(保存到侧车 ``title`` 字段,原始 prompt 不变)。"""
rec = getattr(self, "ai_image_current", None)
if rec is None:
messagebox.showinfo("提示", "请先选择一张图片")
return
current_name = rec.prompt or ""
current = rec.title or rec.display_name
new_name = simpledialog.askstring(
"重命名",
"修改备注名称(显示在分辨率标签后面",
initialvalue=current_name,
"修改显示标题(留空可恢复使用原始提示词",
initialvalue=current,
parent=self.root,
)
if new_name is None: # 用户点了取消
if new_name is None: # 取消
return
new_name = new_name.strip()
if not new_name:
messagebox.showwarning("提示", "备注名称不能为空")
return
if new_name == current_name:
if new_name == (rec.title or ""):
return
# 写回 JSON 元数据
try:
import json
meta_path = os.path.splitext(rec.image_path)[0] + ".json"
meta = {}
if os.path.isfile(meta_path):
with open(meta_path, "r", encoding="utf-8") as f:
meta = json.load(f)
meta["prompt"] = new_name
with open(meta_path, "w", encoding="utf-8") as f:
json.dump(meta, f, ensure_ascii=False, indent=2)
except Exception as exc:
messagebox.showerror("保存失败", f"无法更新元数据:\n{exc}")
if not _svc.update_record_title(rec, new_name):
messagebox.showerror("保存失败", "无法更新元数据,请检查文件权限。")
return
# 同步内存中的记录并刷新列表
rec.prompt = new_name
target_id = rec.id
reload_ai_image_list(self)
# 重新定位到刚才被重命名的图片
for i, r in enumerate(self.ai_image_records):
if r.id == rec.id:
# 重新定位
for row, ridx in enumerate(getattr(self, "_ai_image_row_map", []) or []):
if ridx is None:
continue
r = self.ai_image_records[ridx]
if r.id == target_id:
self.ai_image_listbox.selection_clear(0, tk.END)
self.ai_image_listbox.selection_set(i)
self.ai_image_listbox.activate(i)
self.ai_image_listbox.see(i)
self.ai_image_listbox.selection_set(row)
self.ai_image_listbox.activate(row)
self.ai_image_listbox.see(row)
_select_record(self, r)
break
@@ -415,14 +533,16 @@ def _rename_current(self):
def _show_list_context_menu(self, event):
"""在图片列表上显示右键菜单,并根据状态启用/禁用项。"""
try:
idx = self.ai_image_listbox.nearest(event.y)
row = self.ai_image_listbox.nearest(event.y)
except Exception:
idx = -1
if 0 <= idx < len(self.ai_image_records):
row = -1
row_map = getattr(self, "_ai_image_row_map", None) or []
ridx = row_map[row] if 0 <= row < len(row_map) else None
if ridx is not None and 0 <= ridx < len(self.ai_image_records):
self.ai_image_listbox.selection_clear(0, tk.END)
self.ai_image_listbox.selection_set(idx)
self.ai_image_listbox.activate(idx)
_select_record(self, self.ai_image_records[idx])
self.ai_image_listbox.selection_set(row)
self.ai_image_listbox.activate(row)
_select_record(self, self.ai_image_records[ridx])
has_selection = self.ai_image_current is not None
ucd = getattr(self, "ucd", None)