類似画像検索

初めに

次のサイトを参考にしました

https://note.com/tora_no_oya/n/na875d41885be

イメージ

プログラム

import os
import threading
import shutil
import configparser
from tkinter import (Tk, filedialog, messagebox, Spinbox, PanedWindow, Canvas)
from tkinter import ttk
from PIL import Image, ImageTk
import imagehash
import subprocess

INI_FILE = "settings.ini"

# -------------------------------------------------------------------------
# 類似画像検索(phash)
# -------------------------------------------------------------------------
def find_duplicate_images(directory, threshold_distance, progress_callback):
    hashes = {}
    image_extensions = {'.jpg', '.jpeg', '.png', '.bmp', '.gif', '.webp'}

    files_to_process = []
    for root, dirs, files in os.walk(directory):
        for f in files:
            ext = os.path.splitext(f)[1].lower()
            if ext in image_extensions:
                files_to_process.append(os.path.join(root, f))

    total_files = len(files_to_process)
    if total_files == 0:
        return []

    for i, filepath in enumerate(files_to_process):
        try:
            with Image.open(filepath) as img:
                h = imagehash.phash(img.convert('RGB'))
                hashes[filepath] = h
            progress_callback(f"画像を分析中: {i+1}/{total_files}", (i+1)/total_files * 50)
        except:
            pass

    duplicates = []
    checked = set()
    items = list(hashes.items())

    for i in range(len(items)):
        f1, h1 = items[i]
        if f1 in checked:
            continue

        group = [(f1, h1)]
        for j in range(i+1, len(items)):
            f2, h2 = items[j]
            if f2 in checked:
                continue
            if h1 - h2 <= threshold_distance:
                group.append((f2, h2))
                checked.add(f2)

        if len(group) > 1:
            duplicates.append(group)
            checked.add(f1)

        progress_callback(f"画像を比較中: {i+1}/{len(items)}", 50 + (i+1)/len(items) * 50)

    return duplicates


# -------------------------------------------------------------------------
# GUI
# -------------------------------------------------------------------------
class App(Tk):
    def __init__(self):
        super().__init__()
        self.title("類似画像検索")
        self.geometry("1300x900")

        # 状態管理
        self.current_directory = ""
        self.duplicates = []
        self.current_group_filepaths = []
        self.thumbnail_images = []
        self.move_log = []
        self.ffmpeg_path = ""
        self.video_path = ""
        self.similarity_percent = "95"
        self.thumb_size = "120"

        # UIスタイル
        style = ttk.Style()
        style.configure("Big.TButton", font=("Meiryo", 12), padding=10)

        # ini 読み込み
        self.load_ini()

        # ------------------------------
        # 上段:設定エリア
        # ------------------------------
        settings_top = ttk.Frame(self, padding=10)
        settings_top.pack(fill="x")

        ttk.Label(settings_top, text="フォルダ:").pack(side="left")
        self.dir_entry = ttk.Entry(settings_top, width=45)
        self.dir_entry.pack(side="left", padx=5)
        if self.current_directory:
            self.dir_entry.insert(0, self.current_directory)

        ttk.Button(settings_top, text="参照", style="Big.TButton",
                   command=self.select_directory).pack(side="left", padx=5)

        ttk.Label(settings_top, text="類似度(%):").pack(side="left", padx=(20, 5))
        self.similarity_spinbox = Spinbox(settings_top, from_=0, to=100, width=6, justify="center")
        self.similarity_spinbox.delete(0, "end")
        self.similarity_spinbox.insert(0, self.similarity_percent)
        self.similarity_spinbox.config(font=("Meiryo", 14))
        self.similarity_spinbox.pack(side="left")

        ttk.Label(settings_top, text="サムネイル:").pack(side="left", padx=(20, 5))
        self.thumb_size_spinbox = Spinbox(
            settings_top, from_=60, to=240, width=6, justify="center",
            command=self.on_thumb_size_change
        )
        self.thumb_size_spinbox.delete(0, "end")
        self.thumb_size_spinbox.insert(0, self.thumb_size)
        self.thumb_size_spinbox.config(font=("Meiryo", 14))
        self.thumb_size_spinbox.pack(side="left")

        ttk.Label(settings_top, text="ffmpeg.exe:").pack(side="left", padx=(20, 5))
        self.ffmpeg_entry = ttk.Entry(settings_top, width=40)
        self.ffmpeg_entry.pack(side="left", padx=5)
        if self.ffmpeg_path:
            self.ffmpeg_entry.insert(0, self.ffmpeg_path)

        ttk.Button(settings_top, text="参照", style="Big.TButton",
                   command=self.select_ffmpeg).pack(side="left", padx=5)

        # ------------------------------
        # 下段:操作ボタン(修正版)
        # ------------------------------
        settings_bottom = ttk.Frame(self, padding=10)
        settings_bottom.pack(fill="x")

        ttk.Button(settings_bottom, text="検索開始", style="Big.TButton",
                   command=self.start_search).pack(side="left", padx=10)

        ttk.Button(settings_bottom, text="フォルダ分け", style="Big.TButton",
                   command=self.auto_group_folders).pack(side="left", padx=5)

        ttk.Button(settings_bottom, text="元に戻す", style="Big.TButton",
                   command=self.undo_group_folders).pack(side="left", padx=5)

        ttk.Button(settings_bottom, text="動画を読み込む", style="Big.TButton",
                   command=self.select_video).pack(side="left", padx=10)

        ttk.Button(settings_bottom, text="動画→画像分割", style="Big.TButton",
                   command=self.split_video).pack(side="left", padx=10)

        ttk.Button(settings_bottom, text="グループ動画作成", style="Big.TButton",
                   command=self.create_group_videos).pack(side="left", padx=10)

        # ★ 最後から2番目:選択画像削除
        ttk.Button(settings_bottom, text="選択画像を削除", style="Big.TButton",
                   command=self.delete_selected_file).pack(side="left", padx=10)

        # ★ 最後:終了ボタン
        # ttk.Button(settings_bottom, text="終了", style="Big.TButton",
        #            command=self.quit).pack(side="left", padx=10)

        # ------------------------------
        # 進捗
        # ------------------------------
        progress_frame = ttk.Frame(self, padding=(10, 0))
        progress_frame.pack(fill="x")

        self.progress_label = ttk.Label(progress_frame, text="待機中")
        self.progress_label.pack(fill="x")

        self.progress_bar = ttk.Progressbar(progress_frame, mode="determinate")
        self.progress_bar.pack(fill="x")

        # ------------------------------
        # メインエリア(左右分割)
        # ------------------------------
        paned = PanedWindow(self, orient="horizontal")
        paned.pack(expand=True, fill="both", padx=10, pady=10)

        left = ttk.Frame(paned)
        paned.add(left, width=450)

        self.tree = ttk.Treeview(
            left,
            columns=("filepath", "similarity", "distance"),
            show="tree headings"
        )
        self.tree.heading("#0", text="グループ / ファイル名")
        self.tree.heading("similarity", text="類似度(%)")
        self.tree.heading("distance", text="距離")
        self.tree.column("filepath", width=0, stretch=False)
        self.tree.column("similarity", width=100, anchor="center")
        self.tree.column("distance", width=60, anchor="center")

        vsb = ttk.Scrollbar(left, orient="vertical", command=self.tree.yview)
        self.tree.configure(yscrollcommand=vsb.set)

        self.tree.pack(side="left", expand=True, fill="both")
        vsb.pack(side="right", fill="y")

        self.tree.bind("<<TreeviewSelect>>", self.on_tree_select)

        right = ttk.Frame(paned)
        paned.add(right)

        self.image_label = ttk.Label(right, text="画像を選択してください", anchor="center")
        self.image_label.pack(expand=True, fill="both")

        self.info_label = ttk.Label(right, text="", wraplength=500, justify="left")
        self.info_label.pack(fill="x", pady=5)

        thumb_container = ttk.Frame(right)
        thumb_container.pack(expand=True, fill="both")

        self.thumb_canvas = Canvas(thumb_container)
        self.thumb_scroll = ttk.Scrollbar(thumb_container, orient="vertical",
                                          command=self.thumb_canvas.yview)
        self.thumb_frame = ttk.Frame(self.thumb_canvas)

        self.thumb_canvas.create_window((0, 0), window=self.thumb_frame, anchor="nw")
        self.thumb_canvas.configure(yscrollcommand=self.thumb_scroll.set)

        self.thumb_canvas.pack(side="left", expand=True, fill="both")
        self.thumb_scroll.pack(side="right", fill="y")

        self.thumb_frame.bind(
            "<Configure>",
            lambda e: self.thumb_canvas.configure(scrollregion=self.thumb_canvas.bbox("all"))
        )

        self.thumb_canvas.bind("<Configure>", self.on_thumb_canvas_resize)
    # -------------------------------------------------------------------------
    # ini 読み込み / 保存
    # -------------------------------------------------------------------------
    def load_ini(self):
        config = configparser.ConfigParser()
        if os.path.exists(INI_FILE):
            config.read(INI_FILE, encoding="utf-8")

        self.current_directory = config.get("settings", "directory", fallback="")
        self.similarity_percent = config.get("settings", "similarity", fallback="95")
        self.thumb_size = config.get("settings", "thumb_size", fallback="120")
        self.ffmpeg_path = config.get("settings", "ffmpeg", fallback="")

    def save_ini(self):
        config = configparser.ConfigParser()
        config["settings"] = {
            "directory": self.current_directory,
            "similarity": self.similarity_spinbox.get(),
            "thumb_size": self.thumb_size_spinbox.get(),
            "ffmpeg": self.ffmpeg_entry.get()
        }
        with open(INI_FILE, "w", encoding="utf-8") as f:
            config.write(f)

    # -------------------------------------------------------------------------
    # フォルダ / ffmpeg / 動画 選択
    # -------------------------------------------------------------------------
    def select_directory(self):
        d = filedialog.askdirectory()
        if d:
            self.dir_entry.delete(0, "end")
            self.dir_entry.insert(0, d)
            self.current_directory = d
            self.save_ini()

    def select_ffmpeg(self):
        f = filedialog.askopenfilename(
            title="ffmpeg.exe を選択",
            filetypes=[("ffmpeg.exe", "*.exe")]
        )
        if f:
            self.ffmpeg_entry.delete(0, "end")
            self.ffmpeg_entry.insert(0, f)
            self.save_ini()

    def select_video(self):
        path = filedialog.askopenfilename(
            title="動画ファイルを選択",
            filetypes=[("動画ファイル", "*.mp4;*.mov;*.avi;*.mkv;*.webm")]
        )
        if path:
            self.video_path = path
            messagebox.showinfo("動画読み込み", f"動画を読み込みました:\n{path}")

    # -------------------------------------------------------------------------
    # 類似画像検索
    # -------------------------------------------------------------------------
    def start_search(self):
        directory = self.dir_entry.get()
        if not os.path.isdir(directory):
            messagebox.showerror("エラー", "フォルダを選択してください")
            return

        try:
            sim = int(self.similarity_spinbox.get())
        except:
            messagebox.showerror("エラー", "類似度は0〜100で入力してください")
            return

        threshold_distance = round((100 - sim) / 100 * 64)

        self.current_directory = directory
        self.save_ini()

        self.tree.delete(*self.tree.get_children())
        self.image_label.config(text="検索中...", image="")
        self.info_label.config(text="")
        self.current_group_filepaths = []
        self.duplicates = []
        self.move_log = []

        for w in self.thumb_frame.winfo_children():
            w.destroy()

        threading.Thread(
            target=self.run_search_thread,
            args=(directory, threshold_distance),
            daemon=True
        ).start()

    def run_search_thread(self, directory, threshold_distance):
        try:
            d = find_duplicate_images(directory, threshold_distance, self.update_progress)
            self.after(0, self.display_results, d)
        except Exception as e:
            self.after(0, messagebox.showerror, "エラー", str(e))
        finally:
            self.after(0, self.search_complete)

    def update_progress(self, msg, val):
        self.progress_label.config(text=msg)
        self.progress_bar["value"] = val

    def search_complete(self):
        self.progress_label.config(text="待機中")
        self.progress_bar["value"] = 0

    # -------------------------------------------------------------------------
    # 結果表示
    # -------------------------------------------------------------------------
    def display_results(self, duplicates):
        self.duplicates = duplicates

        if not duplicates:
            messagebox.showinfo("結果", "類似画像は見つかりませんでした")
            return

        for i, group in enumerate(duplicates):
            gid = self.tree.insert("", "end",
                                   text=f"グループ {i+1} ({len(group)}枚)",
                                   open=True,
                                   values=("", "", ""))

            rep_hash = group[0][1]

            for filepath, h in group:
                dist = h - rep_hash
                sim = max(0, min(100, int(round(100 - (dist/64)*100))))
                name = os.path.basename(filepath)

                self.tree.insert(
                    gid, "end",
                    text=f"  {name}",
                    values=(filepath, f"{sim}%", dist)
                )

    # -------------------------------------------------------------------------
    # サムネイル表示
    # -------------------------------------------------------------------------
    def show_thumbnails(self, filepaths):
        for w in self.thumb_frame.winfo_children():
            w.destroy()

        self.thumbnail_images = []
        self.current_group_filepaths = filepaths

        try:
            size = int(self.thumb_size_spinbox.get())
        except:
            size = 120

        canvas_width = max(1, self.thumb_canvas.winfo_width())
        col_width = size + 20
        columns = max(1, canvas_width // col_width)

        for i, fp in enumerate(filepaths):
            try:
                img = Image.open(fp)
                img.thumbnail((size, size))
                photo = ImageTk.PhotoImage(img)
                self.thumbnail_images.append(photo)

                lbl = ttk.Label(self.thumb_frame, image=photo,
                                text=os.path.basename(fp), compound="top")
                lbl.grid(row=i // columns, column=i % columns, padx=5, pady=5)

                lbl.bind("<Button-1>", lambda e, f=fp: self.on_thumbnail_click(f))

            except:
                lbl = ttk.Label(self.thumb_frame, text="読み込み不可")
                lbl.grid(row=i // columns, column=i % columns, padx=5, pady=5)

    def on_thumb_canvas_resize(self, event):
        if self.current_group_filepaths:
            self.show_thumbnails(self.current_group_filepaths)

    def on_thumbnail_click(self, filepath):
        self.show_large_image(filepath)
        self.info_label.config(text=filepath)

    # -------------------------------------------------------------------------
    # 大きい画像表示
    # -------------------------------------------------------------------------
    def show_large_image(self, filepath):
        try:
            img = Image.open(filepath)
            img.thumbnail((600, 600))
            photo = ImageTk.PhotoImage(img)
            self.image_label.config(image=photo, text="")
            self.image_label.image = photo
        except Exception as e:
            self.image_label.config(text=str(e), image="")
            self.image_label.image = None

    # -------------------------------------------------------------------------
    # TreeView 選択
    # -------------------------------------------------------------------------
    def on_tree_select(self, event):
        sel = self.tree.selection()
        if not sel:
            return

        item = sel[0]
        parent = self.tree.parent(item)

        if parent == "":
            children = self.tree.get_children(item)
            fps = [self.tree.item(c, "values")[0] for c in children]
            self.show_thumbnails(fps)
            self.image_label.config(image="", text="")
            self.info_label.config(text="グループ内の画像")
            return

        fp, sim, dist = self.tree.item(item, "values")
        self.show_large_image(fp)
        self.info_label.config(text=f"{fp}\n類似度: {sim}  距離: {dist}")

    # -------------------------------------------------------------------------
    # サムネイルサイズ変更
    # -------------------------------------------------------------------------
    def on_thumb_size_change(self):
        if self.current_group_filepaths:
            self.show_thumbnails(self.current_group_filepaths)
        self.save_ini()

    # -------------------------------------------------------------------------
    # フォルダ分け
    # -------------------------------------------------------------------------
    def auto_group_folders(self):
        if not self.current_directory or not self.duplicates:
            messagebox.showinfo("情報", "まず検索してください")
            return

        base = self.current_directory
        self.move_log = []

        try:
            for i, group in enumerate(self.duplicates, start=1):
                folder = os.path.join(base, f"group_{i:03d}")
                os.makedirs(folder, exist_ok=True)

                for fp, _ in group:
                    if not os.path.exists(fp):
                        continue

                    dst = os.path.join(folder, os.path.basename(fp))
                    self.move_log.append((fp, dst))
                    shutil.move(fp, dst)

            messagebox.showinfo("完了", "フォルダ分けしました")

        except Exception as e:
            messagebox.showerror("エラー", str(e))

    # -------------------------------------------------------------------------
    # Undo(元に戻す)
    # -------------------------------------------------------------------------
    def undo_group_folders(self):
        if not self.move_log:
            messagebox.showinfo("情報", "元に戻す記録がありません")
            return

        try:
            for src_before, dst_after in reversed(self.move_log):
                if os.path.exists(dst_after):
                    os.makedirs(os.path.dirname(src_before), exist_ok=True)
                    shutil.move(dst_after, src_before)

            self.move_log = []
            messagebox.showinfo("完了", "元に戻しました")

        except Exception as e:
            messagebox.showerror("エラー", str(e))

    # -------------------------------------------------------------------------
    # 選択画像を削除(最後から2番目)
    # -------------------------------------------------------------------------
    def delete_selected_file(self):
        sel = self.tree.selection()
        if not sel:
            messagebox.showinfo("情報", "削除する画像を選択してください")
            return

        item = sel[0]
        parent = self.tree.parent(item)

        if parent == "":
            messagebox.showerror("エラー", "グループ全体は削除できません")
            return

        fp, sim, dist = self.tree.item(item, "values")

        if not os.path.exists(fp):
            messagebox.showerror("エラー", "ファイルが存在しません")
            return

        if messagebox.askyesno("確認", f"本当に削除しますか?\n{fp}"):
            try:
                os.remove(fp)
                self.tree.delete(item)
                self.image_label.config(image="", text="削除されました")
                self.info_label.config(text="")

                for w in self.thumb_frame.winfo_children():
                    w.destroy()

            except Exception as e:
                messagebox.showerror("エラー", str(e))

    # -------------------------------------------------------------------------
    # グループ動画作成(PNG → 連番 → FFmpeg)
    # -------------------------------------------------------------------------
    def create_group_videos(self):
        ffmpeg = self.ffmpeg_entry.get()
        if not os.path.isfile(ffmpeg):
            messagebox.showerror("エラー", "ffmpeg.exe のパスが正しくありません")
            return

        if not self.duplicates:
            messagebox.showinfo("情報", "まず検索してください")
            return

        if not self.video_path:
            messagebox.showerror("エラー", "元動画が読み込まれていません")
            return

        video_dir = os.path.dirname(self.video_path)

        for i, group in enumerate(self.duplicates, start=1):
            group_folder = os.path.join(video_dir, f"group_{i:03d}_frames")
            os.makedirs(group_folder, exist_ok=True)

            for idx, (fp, _) in enumerate(group, start=1):
                try:
                    img = Image.open(fp).convert("RGB")
                    out_path = os.path.join(group_folder, f"{idx:06d}.png")
                    img.save(out_path)
                except Exception as e:
                    print("変換エラー:", e)

            out_video = os.path.join(video_dir, f"group_{i:03d}.mp4")

            cmd = [
                ffmpeg,
                "-y",
                "-framerate", "30",
                "-i", "%06d.png",
                "-pix_fmt", "yuv420p",
                out_video
            ]

            try:
                subprocess.run(cmd, cwd=group_folder, check=True)
            except Exception as e:
                messagebox.showerror("エラー", str(e))
                return

        messagebox.showinfo("完了", "グループ動画を作成しました")

    # -------------------------------------------------------------------------
    # ffmpeg 動画 → 画像分割
    # -------------------------------------------------------------------------
    def split_video(self):
        ffmpeg = self.ffmpeg_entry.get()
        if not os.path.isfile(ffmpeg):
            messagebox.showerror("エラー", "ffmpeg.exe のパスが正しくありません")
            return

        if not self.video_path:
            messagebox.showerror("エラー", "動画が読み込まれていません")
            return

        if not self.current_directory:
            messagebox.showerror("エラー", "類似画像フォルダが設定されていません")
            return

        base_name = os.path.splitext(os.path.basename(self.video_path))[0]
        out_dir = os.path.join(self.current_directory, "video_frames", base_name)
        os.makedirs(out_dir, exist_ok=True)

        output_pattern = os.path.join(out_dir, "%06d.png")

        cmd = [
            ffmpeg,
            "-i", self.video_path,
            output_pattern
        ]

        try:
            self.progress_label.config(text="動画を画像に分割中...")
            self.progress_bar["value"] = 0
            self.update()

            subprocess.run(cmd, check=True)

            self.progress_label.config(text="完了")
            self.progress_bar["value"] = 100

            messagebox.showinfo("完了", f"画像を分割しました:\n{out_dir}")

        except Exception as e:
            messagebox.showerror("エラー", str(e))


# -------------------------------------------------------------------------
# 実行
# -------------------------------------------------------------------------
if __name__ == "__main__":
    app = App()
    app.mainloop()

Python,画像

Posted by eightban