修复了一个bug, 这个bug曾经导致选图期间图像重复

This commit is contained in:
jinye_huang 2025-05-09 16:03:23 +08:00
parent 5fa4f0d2d4
commit 961ae5c553

View File

@ -179,10 +179,10 @@ class PosterNotesCreator:
output_filename_template: str = "additional_{index}.jpg",
variation_strength: str = "medium",
extra_effects: bool = True,
collage_style: str = None
collage_style: str = "grid_2x2" # 默认使用grid模式
) -> List[str]:
"""
选择未被海报使用的图像作为额外配图处理为3:4比例的拼图
选择未被海报使用的图像作为额外配图使用2x2网格拼接多张图片
Args:
run_id: 运行ID
@ -190,74 +190,199 @@ class PosterNotesCreator:
variant_index: 变体索引
poster_metadata_path: 海报元数据路径
source_image_dir: 源图像目录
num_additional_images: 使用的额外图像数量默认为3
num_additional_images: 输出的额外配图数量默认为3
output_filename_template: 输出文件名模板
variation_strength: 变化强度可以是 'low', 'medium', 'high'
extra_effects: 是否应用额外效果
collage_style: 拼图风格可以是 'grid', 'mosaic', 'polaroid', 'slice', 'random'
collage_style: 拼图风格固定为 'grid'
Returns:
List[str]: 保存的图像路径列表
"""
logger.info(f"开始为主题 {topic_index} 变体 {variant_index} 选择额外配图")
# 获取候选图像
# 获取候选图像 - 我们需要至少4*num_additional_images张图片
num_source_images_needed = min(4 * num_additional_images, 12) # 限制最多12张源图
candidate_images = self.get_candidate_images(
poster_metadata_path,
source_image_dir,
num_additional_images
num_source_images_needed
)
if not candidate_images:
logger.warning("没有找到合适的候选图像")
return []
if len(candidate_images) < 4:
logger.warning(f"可用图像数量({len(candidate_images)})少于4张无法创建2x2拼图")
return []
# 生成唯一的随机种子
seed_str = f"{run_id}_{topic_index}_{variant_index}"
seed = sum(ord(c) for c in seed_str)
random.seed(seed)
logger.info(f"使用随机种子: {seed},基于: {seed_str}")
# 打乱候选图像顺序
random.shuffle(candidate_images)
# 使用多进程并行处理图像
saved_paths = []
with concurrent.futures.ProcessPoolExecutor(max_workers=min(4, len(candidate_images))) as executor:
with concurrent.futures.ProcessPoolExecutor(max_workers=min(4, num_additional_images)) as executor:
# 创建任务
future_to_image = {}
for i, image_filename in enumerate(candidate_images):
image_path = os.path.join(source_image_dir, image_filename)
# 为每个图像创建单独的种子
future_to_image_set = {}
for i in range(num_additional_images):
# 为每个输出选择4张不同的图片
selected_indices = []
# 确保我们有足够的图片可选择
available_indices = list(range(len(candidate_images)))
# 如果图片不够,我们可能需要重复使用一些图片
if len(available_indices) < 4:
selected_indices = available_indices * (4 // len(available_indices) + 1)
selected_indices = selected_indices[:4]
else:
# 随机选择4个不同的索引
selected_indices = random.sample(available_indices, 4)
# 获取对应的图片文件名
selected_images = [candidate_images[idx] for idx in selected_indices]
# 为每个拼图创建单独的种子
image_seed = seed + i
future = executor.submit(
self.process_single_image,
self.process_multiple_images,
run_id,
topic_index,
variant_index,
image_path,
image_filename,
i,
source_image_dir,
selected_images,
i,
output_filename_template.format(index=i+1),
image_seed,
variation_strength,
extra_effects,
collage_style
)
future_to_image[future] = (i, image_filename)
future_to_image_set[future] = (i, selected_images)
# 收集结果
for future in concurrent.futures.as_completed(future_to_image):
i, image_filename = future_to_image[future]
for future in concurrent.futures.as_completed(future_to_image_set):
i, selected_images = future_to_image_set[future]
try:
saved_path = future.result()
if saved_path:
saved_paths.append(saved_path)
logger.info(f"已保存额外配图 {i+1}/{len(candidate_images)}: {saved_path}")
logger.info(f"已保存额外配图 {i+1}/{num_additional_images}: {saved_path}")
except Exception as e:
logger.error(f"处理图像时出错 '{image_filename}': {e}")
logger.error(f"处理图像时出错 '{', '.join(selected_images)}': {e}")
logger.error(traceback.format_exc())
# 重置随机种子
random.seed()
return saved_paths
def process_multiple_images(
self,
run_id,
topic_index,
variant_index,
source_dir,
image_filenames,
index,
output_filename,
seed,
variation_strength,
extra_effects,
collage_style="grid_2x2"
):
"""处理多张图像创建2x2网格拼图"""
try:
# 使用core.simple_collage模块处理图像
style = "grid_2x2" # 固定使用grid风格
# 创建临时目录来存放图像以便传递给process_collage函数
import tempfile
import shutil
with tempfile.TemporaryDirectory() as temp_dir:
# 复制选中的图像到临时目录
temp_image_paths = []
for img_filename in image_filenames:
src_path = os.path.join(source_dir, img_filename)
dst_path = os.path.join(temp_dir, img_filename)
shutil.copy2(src_path, dst_path)
temp_image_paths.append(dst_path)
logger.info(f"为网格拼图准备了 {len(temp_image_paths)} 张图像: {', '.join(image_filenames)}")
# 设置随机种子以确保结果一致性
if seed is not None:
random.seed(seed)
np.random.seed(seed)
# 调用core.simple_collage模块处理图像
target_size = (900, 1200) # 3:4比例
collage_images, used_image_filenames = process_collage(
temp_dir,
style=style,
target_size=target_size,
output_count=1,
)
# 重置随机种子
if seed is not None:
random.seed()
np.random.seed()
if not collage_images or len(collage_images) == 0:
logger.error(f"拼图模块没有生成有效的图像")
return None
processed_image = collage_images[0]
# 确保图像是RGB模式解决"cannot write mode RGBA as JPEG"错误
if processed_image.mode == 'RGBA':
logger.debug(f"将RGBA图像转换为RGB模式")
# 创建白色背景并粘贴RGBA图像
background = Image.new('RGB', processed_image.size, (255, 255, 255))
background.paste(processed_image, mask=processed_image.split()[3]) # 使用alpha通道作为mask
processed_image = background
elif processed_image.mode != 'RGB':
logger.debug(f"{processed_image.mode}图像转换为RGB模式")
processed_image = processed_image.convert('RGB')
# 创建元数据
additional_metadata = {
"original_images": image_filenames,
"additional_index": index + 1,
"source_dir": source_dir,
"is_additional_image": True,
"processed": True,
"aspect_ratio": "3:4",
"collage_style": style,
"grid_size": "2x2"
}
# 使用输出处理器保存图像
return self.output_handler.handle_generated_image(
run_id,
topic_index,
variant_index,
'additional', # 图像类型为additional
processed_image,
output_filename,
additional_metadata
)
except Exception as e:
logger.error(f"处理多张图像时出错: {e}")
logger.error(traceback.format_exc())
return None
def get_candidate_images(self, poster_metadata_path, source_image_dir, num_images):
"""获取候选图像列表,排除已用于海报的图像"""
# 检查输入路径是否存在
@ -334,7 +459,7 @@ class PosterNotesCreator:
seed,
variation_strength,
extra_effects,
collage_style="slice"
collage_style="grid_2x2"
):
"""处理单张图像 - 此方法可在独立进程中运行"""
try:
@ -413,267 +538,7 @@ class PosterNotesCreator:
logger.error(traceback.format_exc())
return None
def create_collage_image(
self,
image: Image.Image,
target_ratio: Tuple[int, int] = (3, 4),
style: str = "grid",
seed: int = None
) -> Image.Image:
"""
创建拼图图像
Args:
image: 原始图像
target_ratio: 目标宽高比默认为(3, 4)
style: 拼图风格, 可以是 "grid", "mosaic", "polaroid", "slice", "random"
seed: 随机种子
Returns:
Image.Image: 拼图后的图像
"""
logger.info(f"创建拼图图像,风格: {style}")
# 设置随机种子
if seed is not None:
random.seed(seed)
np.random.seed(seed)
# 调整图像为目标比例
width, height = image.size
current_ratio = width / height
target_ratio_value = target_ratio[0] / target_ratio[1]
# 调整大小
if current_ratio > target_ratio_value: # 图片较宽
new_height = 1200
new_width = int(new_height * current_ratio)
else: # 图片较高
new_width = 900
new_height = int(new_width / current_ratio)
# 高效调整尺寸
resized_image = image.resize((new_width, new_height), Image.LANCZOS)
# 裁剪为目标比例
resized_width, resized_height = resized_image.size
if resized_width / resized_height > target_ratio_value:
crop_width = int(resized_height * target_ratio_value)
crop_x = (resized_width - crop_width) // 2
image_base = resized_image.crop((crop_x, 0, crop_x + crop_width, resized_height))
else:
crop_height = int(resized_width / target_ratio_value)
crop_y = (resized_height - crop_height) // 2
image_base = resized_image.crop((0, crop_y, resized_width, crop_y + crop_height))
# 确保base是RGB模式
image_base = image_base.convert('RGB')
base_width, base_height = image_base.size
# 根据指定风格创建拼图
if style == "random":
# 随机选择一种风格
style = random.choice(["grid", "mosaic", "polaroid", "slice"])
if style == "grid":
# 网格风格: 将图像分割为网格并随机调整每个网格块
grid_size = random.randint(2, 4) # 网格大小
cell_width = base_width // grid_size
cell_height = base_height // grid_size
# 创建空白画布
collage = Image.new('RGB', (base_width, base_height), (255, 255, 255))
# 处理每个网格
for row in range(grid_size):
for col in range(grid_size):
# 计算当前网格的位置
left = col * cell_width
top = row * cell_height
right = left + cell_width
bottom = top + cell_height
# 裁剪当前网格块
cell = image_base.crop((left, top, right, bottom))
# 随机应用效果
effect = random.choice(["rotate", "brightness", "contrast", "none"])
if effect == "rotate":
angle = random.uniform(-5, 5)
cell = cell.rotate(angle, resample=Image.BICUBIC, expand=False)
elif effect == "brightness":
factor = random.uniform(0.9, 1.1)
enhancer = ImageEnhance.Brightness(cell)
cell = enhancer.enhance(factor)
elif effect == "contrast":
factor = random.uniform(0.9, 1.1)
enhancer = ImageEnhance.Contrast(cell)
cell = enhancer.enhance(factor)
# 添加细小边框
border_size = random.randint(1, 3)
bordered_cell = Image.new('RGB', (cell.width + border_size*2, cell.height + border_size*2), (255, 255, 255))
bordered_cell.paste(cell, (border_size, border_size))
# 粘贴回拼图
collage.paste(bordered_cell.resize((cell_width, cell_height)), (left, top))
elif style == "mosaic":
# 马赛克风格: 创建不规则大小的方块组成的拼图
collage = Image.new('RGB', (base_width, base_height), (255, 255, 255))
# 创建不同大小的块
blocks = []
min_size = min(base_width, base_height) // 6
max_size = min(base_width, base_height) // 3
# 生成一些随机块
for _ in range(20): # 尝试放置20个块
block_width = random.randint(min_size, max_size)
block_height = random.randint(min_size, max_size)
left = random.randint(0, base_width - block_width)
top = random.randint(0, base_height - block_height)
blocks.append((left, top, left + block_width, top + block_height))
# 按面积排序块,大的先放
blocks.sort(key=lambda b: (b[2]-b[0])*(b[3]-b[1]), reverse=True)
# 放置每个块
for block in blocks:
left, top, right, bottom = block
# 裁剪原图对应区域
cell = image_base.crop(block)
# 随机应用滤镜
filter_type = random.choice(["none", "blur", "sharpen", "edge_enhance"])
if filter_type == "blur":
cell = cell.filter(ImageFilter.GaussianBlur(radius=random.uniform(0.3, 0.8)))
elif filter_type == "sharpen":
cell = cell.filter(ImageFilter.SHARPEN)
elif filter_type == "edge_enhance":
cell = cell.filter(ImageFilter.EDGE_ENHANCE)
# 添加边框
border_size = random.randint(2, 5)
bordered_cell = Image.new('RGB', (cell.width + border_size*2, cell.height + border_size*2), (240, 240, 240))
bordered_cell.paste(cell, (border_size, border_size))
# 粘贴到拼图上
collage.paste(bordered_cell, (left, top))
elif style == "polaroid":
# 宝丽来风格: 模拟拍立得照片排列
collage = Image.new('RGB', (base_width, base_height), (245, 245, 245))
# 创建2-4张宝丽来照片
num_photos = random.randint(2, 4)
# 根据照片数量确定大小
if num_photos == 2:
photo_width = base_width // 2 - 20
photo_height = int(photo_width * 0.9) # 稍扁一些
elif num_photos == 3:
photo_width = base_width // 3 - 15
photo_height = int(photo_width * 0.9)
else: # 4
photo_width = base_width // 2 - 15
photo_height = base_height // 2 - 15
# 根据照片数量计算布局
layouts = []
if num_photos == 2:
# 水平放置两张
layouts = [
(10, (base_height - photo_height) // 2),
(base_width - photo_width - 10, (base_height - photo_height) // 2 + random.randint(-10, 10))
]
elif num_photos == 3:
# 水平放置三张
layouts = [
(10, (base_height - photo_height) // 2 + random.randint(-15, 15)),
((base_width - photo_width) // 2, (base_height - photo_height) // 2 + random.randint(-5, 5)),
(base_width - photo_width - 10, (base_height - photo_height) // 2 + random.randint(-15, 15))
]
else: # 4
# 2x2网格放置
layouts = [
(10, 10),
(base_width - photo_width - 10, 10 + random.randint(-5, 5)),
(10 + random.randint(-5, 5), base_height - photo_height - 10),
(base_width - photo_width - 10, base_height - photo_height - 10 + random.randint(-5, 5))
]
# 创建并放置每张宝丽来照片
for i, (left, top) in enumerate(layouts):
# 为每张照片选择不同区域
src_left = random.randint(0, base_width - photo_width)
src_top = random.randint(0, base_height - photo_height)
photo = image_base.crop((src_left, src_top, src_left + photo_width, src_top + photo_height))
# 应用宝丽来效果
# 1. 提高对比度
enhancer = ImageEnhance.Contrast(photo)
photo = enhancer.enhance(1.2)
# 2. 轻微增加亮度
enhancer = ImageEnhance.Brightness(photo)
photo = enhancer.enhance(1.05)
# 3. 创建宝丽来边框
border_width = 10
frame = Image.new('RGB', (photo_width + border_width*2, photo_height + border_width*4), (255, 255, 255))
frame.paste(photo, (border_width, border_width))
# 4. 随机旋转
angle = random.uniform(-5, 5)
frame = frame.rotate(angle, resample=Image.BICUBIC, expand=False)
# 粘贴到拼图中
collage.paste(frame, (left, top))
elif style == "slice":
# 切片风格: 水平或垂直切片并错开
# 固定使用水平切片
collage = Image.new('RGB', (base_width, base_height), (255, 255, 255))
# 水平切片
num_slices = 6 # 固定切片数量
slice_height = base_height // num_slices
for i in range(num_slices):
# 切出原图片片
top = i * slice_height
bottom = min(top + slice_height, base_height)
slice_img = image_base.crop((0, top, base_width, bottom))
# 只在奇数行应用固定偏移,让效果更整齐
offset = 20 if i % 2 == 1 else 0 # 固定偏移量,更可控
# 粘贴到拼图,带偏移
paste_left = offset
if paste_left > 0:
# 如果右侧偏移超出边界,需要裁剪
if paste_left + slice_img.width > base_width:
slice_img = slice_img.crop((0, 0, base_width - paste_left, slice_img.height))
collage.paste(slice_img, (paste_left, top))
else:
# 默认情况,直接返回调整后的图像
logger.warning(f"未知的拼图风格: {style},使用原始图像")
collage = image_base
# 清理元数据
result = self.strip_metadata(collage)
# 重置随机种子
if seed is not None:
random.seed()
np.random.seed()
logger.info(f"拼图图像创建完成,风格: {style}")
return result
def add_dct_noise(self, image: Image.Image, intensity: float = 0.1, block_size: int = 8) -> Image.Image:
"""
@ -1775,10 +1640,10 @@ def select_additional_images(
output_filename_template: str = "additional_{index}.jpg",
variation_strength: str = "medium",
extra_effects: bool = True,
collage_style: str = None
collage_style: str = "grid_2x2" # 默认使用grid风格
) -> List[str]:
"""
选择未被海报使用的图像作为额外配图并处理为3:4比例的拼图
选择未被海报使用的图像作为额外配图创建2x2网格拼接图像
Args:
run_id: 运行ID
@ -1791,12 +1656,12 @@ def select_additional_images(
output_filename_template: 输出文件名模板
variation_strength: 变化强度
extra_effects: 是否应用额外效果
collage_style: 拼图风格可以是 'grid', 'mosaic', 'polaroid', 'slice', 'random'
collage_style: 拼图风格固定为 'grid'
Returns:
List[str]: 保存的图像路径列表
"""
logger.info(f"开始为主题 {topic_index} 变体 {variant_index} 选择额外配图")
logger.info(f"开始为主题 {topic_index} 变体 {variant_index} 选择额外配图2x2网格风格")
# 验证输入
if not os.path.exists(poster_metadata_path):