Embedded_game/图片相似度.py

131 lines
4.3 KiB
Python
Raw Normal View History

2025-01-06 09:12:40 +08:00
import os
from skimage.metrics import structural_similarity as ssim
import cv2
import concurrent.futures
def split_file_name(file_name):
"""
将文件名拆分为前缀和后缀
:param file_name: 文件名
:return: 前缀和后缀
"""
file_name = os.path.basename(file_name)
file_name = file_name.split('.')[0]
file_names = file_name.split('_')
return file_names[0] + '_' + file_names[1], file_names[2]
def get_sorted_files_group(directory):
try:
# 检查目录是否存在
if not os.path.exists(directory):
print(f"目录 {directory} 不存在!")
return {}
# 获取目录下的所有文件及其完整路径
files = [
os.path.join(directory, f) for f in os.listdir(directory)
if os.path.isfile(os.path.join(directory, f))
]
files_group = {}
for file in files:
prefix, suffix = split_file_name(file)
files_group.setdefault(prefix, []).append(file)
return files_group
except Exception as e:
print(f"发生错误: {e}")
return {}
def compare_images(image1_path, image2_path):
try:
# 读取两张图片
img1 = cv2.imread(image1_path, cv2.IMREAD_GRAYSCALE)
img2 = cv2.imread(image2_path, cv2.IMREAD_GRAYSCALE)
# 检查图片是否成功读取
if img1 is None or img2 is None:
print(f"无法读取图片: {image1_path}{image2_path}")
return None
# 调整两张图片为相同大小
img1 = cv2.resize(
img1, (min(img1.shape[1], img2.shape[1]), min(img1.shape[0], img2.shape[0]))
)
img2 = cv2.resize(img2, (img1.shape[1], img1.shape[0]))
# 计算相似度
similarity, _ = ssim(img1, img2, full=True)
return similarity
except Exception as e:
print(f"比较图片时发生错误: {e}")
return None
def del_group(group):
"""
返回需要删除的图片列表不在函数里直接执行删除操作
"""
del_list = []
compared_pairs = set() # 用于记录已经比较过的图片对
for i in range(len(group)):
for j in range(len(group)):
if i != j and (i, j) not in compared_pairs and (j, i) not in compared_pairs:
# 标记为已比较
compared_pairs.add((i, j))
# group[i] 或 group[j] 已经在 del_list 中就没必要比较了
if group[i] not in del_list and group[j] not in del_list:
similarity = compare_images(group[i], group[j])
if similarity is not None and similarity > 0.965:
# 相似度大于 0.98,判定为重复
# 保留 group[j],删除 group[i],也可以根据需要调换顺序
del_list.append(group[i])
return del_list
def process_group(prefix, file_list):
"""
子进程运行函数
1. 对指定 group 进行去重计算
2. 返回 (prefix, del_list)
"""
print(f'开始处理 {prefix}, 共 {len(file_list)} 张图片')
del_list = del_group(file_list)
print(f'需要删除 {len(del_list)}/{len(file_list)} 张图片')
return prefix, del_list
if __name__ == "__main__":
directory = r"C:\Users\10561\Desktop\frames"
files_group = get_sorted_files_group(directory)
# for e in files_group:
# print(e, len(files_group[e]))
# exit()
# 使用多进程来处理每一个 group
results = []
with concurrent.futures.ProcessPoolExecutor() as executor:
future_to_prefix = {}
for prefix, file_list in files_group.items():
future = executor.submit(process_group, prefix, file_list)
future_to_prefix[future] = prefix
for future in concurrent.futures.as_completed(future_to_prefix):
prefix = future_to_prefix[future]
try:
ret_prefix, del_list = future.result()
# 在主进程里进行删除
for e in del_list:
if os.path.exists(e):
# print(f'删除 {e}')
os.remove(e)
except Exception as e:
print(f"{prefix} 处理时发生错误: {e}")
print("所有任务已完成!")