TravelContentCreator/utils/poster_notes_creator.py

577 lines
22 KiB
Python
Raw Normal View History

2025-04-26 14:53:54 +08:00
import os
import random
import logging
import json
from PIL import Image
import traceback
from typing import List, Tuple, Dict, Any, Optional
import concurrent.futures
import numpy as np
from PIL import ImageEnhance, ImageFilter
2025-04-26 14:53:54 +08:00
from .output_handler import OutputHandler
logger = logging.getLogger(__name__)
class PosterNotesCreator:
"""
处理原始海报作为主图并随机选择额外的图片作为笔记图片
确保选择的笔记图片与海报中使用的图片不重复
"""
def __init__(self, output_handler: OutputHandler):
"""
初始化 PosterNotesCreator
Args:
output_handler: 可选的 OutputHandler 实例用于处理输出
"""
self.output_handler = output_handler
logging.info("PosterNotesCreator 初始化完成")
def create_notes_images(
self,
run_id: str,
topic_index: int,
variant_index: int,
poster_image_path: str,
poster_metadata_path: str,
source_image_dir: str,
num_additional_images: int,
output_filename_template: str = "note_{index}.jpg"
) -> List[str]:
"""
创建笔记图像
Args:
run_id: 运行ID
topic_index: 主题索引
variant_index: 变体索引
poster_image_path: 海报图像路径
poster_metadata_path: 海报元数据路径
source_image_dir: 源图像目录
num_additional_images: 要使用的额外图像数量
output_filename_template: 输出文件名模板
Returns:
List[str]: 保存的笔记图像路径列表
"""
# 检查输入路径是否存在
if not os.path.exists(poster_image_path):
logger.error(f"海报图像不存在: {poster_image_path}")
return []
if not os.path.exists(poster_metadata_path):
logger.error(f"海报元数据不存在: {poster_metadata_path}")
return []
if not os.path.exists(source_image_dir) or not os.path.isdir(source_image_dir):
logger.error(f"源图像目录不存在: {source_image_dir}")
return []
# 从元数据文件中读取已使用的图像信息
try:
with open(poster_metadata_path, 'r', encoding='utf-8') as f:
poster_metadata = json.load(f)
except Exception as e:
logger.error(f"无法读取海报元数据: {e}")
return []
# 获取已经在海报中使用的图像
used_images = []
if 'collage_images' in poster_metadata:
used_images = poster_metadata['collage_images']
logger.info(f"海报中已使用 {len(used_images)} 张图像: {', '.join(used_images)}")
# 列出源目录中的所有图像文件
image_extensions = ('.jpg', '.jpeg', '.png', '.bmp')
available_images = [
f for f in os.listdir(source_image_dir)
if os.path.isfile(os.path.join(source_image_dir, f)) and
f.lower().endswith(image_extensions)
]
if not available_images:
logger.error(f"源目录中没有找到图像: {source_image_dir}")
return []
logger.info(f"源目录中找到 {len(available_images)} 张图像")
# 过滤掉已经在海报中使用的图像
available_images = [img for img in available_images if img not in used_images]
if not available_images:
logger.warning("所有图像都已在海报中使用,无法创建额外笔记")
return []
logger.info(f"过滤后可用图像数量: {len(available_images)}")
# 如果可用图像少于请求数量,进行警告但继续处理
if len(available_images) < num_additional_images:
logger.warning(
f"可用图像数量 ({len(available_images)}) 少于请求的笔记数量 ({num_additional_images})"
f"将使用所有可用图像"
)
selected_images = available_images
else:
# 随机选择额外图像
selected_images = random.sample(available_images, num_additional_images)
logger.info(f"已选择 {len(selected_images)} 张图像作为笔记")
# 保存选择的笔记图像
saved_paths = []
for i, image_filename in enumerate(selected_images):
try:
# 加载图像
image_path = os.path.join(source_image_dir, image_filename)
image = Image.open(image_path)
# 生成输出文件名
output_filename = output_filename_template.format(index=i+1)
# 创建元数据
note_metadata = {
"original_image": image_filename,
"note_index": i + 1,
"source_dir": source_image_dir,
"associated_poster": os.path.basename(poster_image_path)
}
# 使用输出处理器保存图像
saved_path = self.output_handler.handle_generated_image(
run_id,
topic_index,
variant_index,
'note', # 图像类型为note
image,
output_filename,
note_metadata
)
saved_paths.append(saved_path)
logger.info(f"已保存笔记图像 {i+1}/{len(selected_images)}: {saved_path}")
except Exception as e:
logger.error(f"处理图像时出错 '{image_filename}': {e}")
return saved_paths
def create_additional_images(
self,
run_id: str,
topic_index: int,
variant_index: int,
poster_metadata_path: str,
source_image_dir: str,
num_additional_images: int = 3,
2025-04-26 15:53:44 +08:00
output_filename_template: str = "additional_{index}.jpg",
variation_strength: str = "medium",
extra_effects: bool = True
2025-04-26 14:53:54 +08:00
) -> List[str]:
"""选择未被海报使用的图像作为额外配图并处理为3:4比例"""
logger.info(f"开始为主题 {topic_index} 变体 {variant_index} 选择额外配图")
2025-04-26 14:53:54 +08:00
# 获取候选图像
candidate_images = self.get_candidate_images(
poster_metadata_path,
source_image_dir,
num_additional_images
)
if not candidate_images:
logger.warning("没有找到合适的候选图像")
return []
2025-04-26 14:53:54 +08:00
# 生成唯一的随机种子
seed_str = f"{run_id}_{topic_index}_{variant_index}"
seed = sum(ord(c) for c in seed_str)
logger.info(f"使用随机种子: {seed},基于: {seed_str}")
# 使用多进程并行处理图像
saved_paths = []
with concurrent.futures.ProcessPoolExecutor(max_workers=min(4, len(candidate_images))) as executor:
# 创建任务
future_to_image = {}
for i, image_filename in enumerate(candidate_images):
image_path = os.path.join(source_image_dir, image_filename)
# 为每个图像创建单独的种子
image_seed = seed + i
future = executor.submit(
self.process_single_image,
run_id,
topic_index,
variant_index,
image_path,
image_filename,
i,
source_image_dir,
output_filename_template.format(index=i+1),
image_seed,
variation_strength,
extra_effects
)
future_to_image[future] = (i, image_filename)
# 收集结果
for future in concurrent.futures.as_completed(future_to_image):
i, image_filename = future_to_image[future]
try:
saved_path = future.result()
if saved_path:
saved_paths.append(saved_path)
logger.info(f"已保存额外配图 {i+1}/{len(candidate_images)}: {saved_path}")
except Exception as e:
logger.error(f"处理图像时出错 '{image_filename}': {e}")
logger.error(traceback.format_exc())
2025-04-26 14:53:54 +08:00
return saved_paths
def get_candidate_images(self, poster_metadata_path, source_image_dir, num_images):
"""获取候选图像列表,排除已用于海报的图像"""
2025-04-26 14:53:54 +08:00
# 检查输入路径是否存在
if not os.path.exists(poster_metadata_path):
logger.error(f"海报元数据不存在: {poster_metadata_path}")
return []
if not os.path.exists(source_image_dir) or not os.path.isdir(source_image_dir):
logger.error(f"源图像目录不存在: {source_image_dir}")
return []
# 从元数据文件中读取已使用的图像信息
try:
with open(poster_metadata_path, 'r', encoding='utf-8') as f:
poster_metadata = json.load(f)
except Exception as e:
logger.error(f"无法读取海报元数据: {e}")
return []
# 获取已经在海报中使用的图像
used_images = []
if 'collage_images' in poster_metadata:
used_images = poster_metadata['collage_images']
logger.info(f"海报中已使用 {len(used_images)} 张图像: {', '.join(used_images)}")
# 列出源目录中的所有图像文件
image_extensions = ('.jpg', '.jpeg', '.png', '.bmp')
available_images = [
f for f in os.listdir(source_image_dir)
if os.path.isfile(os.path.join(source_image_dir, f)) and
f.lower().endswith(image_extensions)
]
if not available_images:
logger.error(f"源目录中没有找到图像: {source_image_dir}")
return []
logger.info(f"源目录中找到 {len(available_images)} 张图像")
# 过滤掉已经在海报中使用的图像
available_images = [img for img in available_images if img not in used_images]
if not available_images:
logger.warning("所有图像都已在海报中使用,无法创建额外配图")
return []
logger.info(f"过滤后可用图像数量: {len(available_images)}")
# 如果可用图像少于请求数量,进行警告但继续处理
if len(available_images) < num_images:
2025-04-26 14:53:54 +08:00
logger.warning(
f"可用图像数量 ({len(available_images)}) 少于请求的配图数量 ({num_images})"
2025-04-26 14:53:54 +08:00
f"将使用所有可用图像"
)
selected_images = available_images
else:
# 随机选择额外图像
random.seed(sum(map(ord, ''.join(available_images)))) # 确保结果一致性
selected_images = random.sample(available_images, num_images)
random.seed() # 重置随机种子
2025-04-26 14:53:54 +08:00
return selected_images
def process_single_image(
self,
run_id,
topic_index,
variant_index,
image_path,
image_filename,
index,
source_dir,
output_filename,
seed,
variation_strength,
extra_effects
):
"""处理单张图像 - 此方法可在独立进程中运行"""
try:
# 加载图像
image = Image.open(image_path)
# 处理图像为3:4比例并添加微小变化
processed_image = self.optimized_process_image(
image,
(3, 4),
add_variation=True,
seed=seed,
variation_strength=variation_strength,
extra_effects=extra_effects
)
# 创建元数据
additional_metadata = {
"original_image": image_filename,
"additional_index": index + 1,
"source_dir": source_dir,
"is_additional_image": True,
"processed": True,
"aspect_ratio": "3:4",
"variation_applied": True,
"variation_strength": variation_strength,
"extra_effects": extra_effects
}
# 使用输出处理器保存图像
return self.output_handler.handle_generated_image(
run_id,
topic_index,
variant_index,
'additional', # 图像类型为additional
processed_image,
output_filename,
additional_metadata
)
except Exception as e:
logger.error(f"处理图像时出错 '{image_filename}': {e}")
logger.error(traceback.format_exc())
return None
def optimized_process_image(
2025-04-26 15:53:44 +08:00
self,
image: Image.Image,
target_ratio: Tuple[int, int],
add_variation: bool = True,
seed: int = None,
variation_strength: str = "medium",
extra_effects: bool = True
2025-04-26 15:53:44 +08:00
) -> Image.Image:
"""优化后的图像处理方法,使用更高效的算法"""
# 设置随机种子
2025-04-26 15:53:44 +08:00
if seed is not None:
random.seed(seed)
np.random.seed(seed)
2025-04-26 15:53:44 +08:00
# 根据微调强度设置参数
2025-04-26 15:53:44 +08:00
if variation_strength == "low":
brightness_factor = random.uniform(0.97, 1.03)
contrast_factor = random.uniform(0.97, 1.03)
saturation_factor = random.uniform(0.97, 1.03)
2025-04-26 15:53:44 +08:00
max_rotation = 0.5
border_size = random.randint(0, 1)
use_extra = random.random() < 0.3 and extra_effects
2025-04-26 15:53:44 +08:00
elif variation_strength == "high":
brightness_factor = random.uniform(0.92, 1.08)
contrast_factor = random.uniform(0.92, 1.08)
saturation_factor = random.uniform(0.92, 1.08)
2025-04-26 15:53:44 +08:00
max_rotation = 2.0
border_size = random.randint(0, 3)
use_extra = extra_effects
else: # medium
brightness_factor = random.uniform(0.95, 1.05)
contrast_factor = random.uniform(0.95, 1.05)
saturation_factor = random.uniform(0.95, 1.05)
2025-04-26 15:53:44 +08:00
max_rotation = 1.0
border_size = random.randint(0, 2)
use_extra = random.random() < 0.7 and extra_effects
2025-04-26 15:53:44 +08:00
# 调整图像为目标比例
2025-04-26 15:53:44 +08:00
width, height = image.size
current_ratio = width / height
target_ratio_value = target_ratio[0] / target_ratio[1]
# 调整大小
2025-04-26 15:53:44 +08:00
if current_ratio > target_ratio_value: # 图片较宽
new_height = 1200
2025-04-26 15:53:44 +08:00
new_width = int(new_height * current_ratio)
else: # 图片较高
new_width = 900
2025-04-26 15:53:44 +08:00
new_height = int(new_width / current_ratio)
# 高效调整尺寸
2025-04-26 15:53:44 +08:00
resized_image = image.resize((new_width, new_height), Image.LANCZOS)
# 裁剪为目标比例
2025-04-26 15:53:44 +08:00
resized_width, resized_height = resized_image.size
if resized_width / resized_height > target_ratio_value:
2025-04-26 15:53:44 +08:00
crop_width = int(resized_height * target_ratio_value)
max_offset = max(1, min(10, (resized_width - crop_width) // 10))
offset = random.randint(-max_offset, max_offset) if add_variation else 0
crop_x1 = max(0, min((resized_width - crop_width) // 2 + offset, resized_width - crop_width))
2025-04-26 15:53:44 +08:00
crop_x2 = crop_x1 + crop_width
result = resized_image.crop((crop_x1, 0, crop_x2, resized_height))
else:
2025-04-26 15:53:44 +08:00
crop_height = int(resized_width / target_ratio_value)
max_offset = max(1, min(10, (resized_height - crop_height) // 10))
offset = random.randint(-max_offset, max_offset) if add_variation else 0
crop_y1 = max(0, min((resized_height - crop_height) // 2 + offset, resized_height - crop_height))
2025-04-26 15:53:44 +08:00
crop_y2 = crop_y1 + crop_height
result = resized_image.crop((0, crop_y1, resized_width, crop_y2))
# 如果不需要变化或是低强度且禁用额外效果
if not add_variation or (variation_strength == "low" and not use_extra):
# 重置随机种子
2025-04-26 15:53:44 +08:00
if seed is not None:
random.seed()
np.random.seed()
2025-04-26 15:53:44 +08:00
return result
# 高效应用基本变化
processed_image = result.convert('RGB')
2025-04-26 15:53:44 +08:00
# 1. 亮度调整
if abs(brightness_factor - 1.0) > 0.01:
enhancer = ImageEnhance.Brightness(processed_image)
processed_image = enhancer.enhance(brightness_factor)
2025-04-26 15:53:44 +08:00
# 2. 对比度调整
if abs(contrast_factor - 1.0) > 0.01:
enhancer = ImageEnhance.Contrast(processed_image)
processed_image = enhancer.enhance(contrast_factor)
# 3. 饱和度调整
if abs(saturation_factor - 1.0) > 0.01:
enhancer = ImageEnhance.Color(processed_image)
processed_image = enhancer.enhance(saturation_factor)
# 4. 旋转 (只在中高强度时应用)
if variation_strength != "low" and abs(max_rotation) > 0.1:
rotation_angle = random.uniform(-max_rotation, max_rotation)
if abs(rotation_angle) > 0.1: # 只有当角度足够大时才旋转
processed_image = processed_image.rotate(rotation_angle, resample=Image.BICUBIC, expand=False)
# 应用额外效果 (只在需要时)
if use_extra:
# 根据强度决定是否应用特定效果
apply_sharpen = random.random() < 0.4
apply_blur = not apply_sharpen and random.random() < 0.3
2025-04-26 15:53:44 +08:00
# 锐化
if apply_sharpen:
enhancer = ImageEnhance.Sharpness(processed_image)
sharpness = 1.2 if variation_strength == "high" else 1.1
processed_image = enhancer.enhance(sharpness)
2025-04-26 15:53:44 +08:00
# 模糊
elif apply_blur:
radius = 0.7 if variation_strength == "high" else 0.4
processed_image = processed_image.filter(ImageFilter.GaussianBlur(radius=radius))
2025-04-26 15:53:44 +08:00
# 边框处理 (在图像不太小的情况下)
if border_size > 0 and min(processed_image.size) > 300:
border_color = (
random.randint(0, 5),
random.randint(0, 5),
random.randint(0, 5)
)
w, h = processed_image.size
bordered = Image.new('RGB', (w + border_size*2, h + border_size*2), border_color)
bordered.paste(processed_image, (border_size, border_size))
2025-04-26 15:53:44 +08:00
# 随机裁剪回原尺寸
offset_x = random.randint(0, border_size*2)
offset_y = random.randint(0, border_size*2)
processed_image = bordered.crop((offset_x, offset_y, offset_x + w, offset_y + h))
# 重置随机种子
if seed is not None:
random.seed()
np.random.seed()
return processed_image
2025-04-26 14:53:54 +08:00
def process_poster_for_notes(
run_id: str,
topic_index: int,
variant_index: int,
poster_image_path: str,
poster_metadata_path: str,
source_image_dir: str,
num_additional_images: int,
output_handler: OutputHandler,
output_filename_template: str = "note_{index}.jpg"
) -> List[str]:
"""
处理海报并创建笔记图像
Args:
run_id: 运行ID
topic_index: 主题索引
variant_index: 变体索引
poster_image_path: 海报图像路径
poster_metadata_path: 海报元数据路径
source_image_dir: 源图像目录
num_additional_images: 要使用的额外图像数量
output_handler: 输出处理器
output_filename_template: 输出文件名模板
Returns:
List[str]: 保存的笔记图像路径列表
"""
logger.info(f"开始为海报创建笔记图像: {poster_image_path}")
# 验证输入
if not os.path.exists(poster_image_path):
logger.error(f"海报图像不存在: {poster_image_path}")
return []
# 创建处理器实例并处理
creator = PosterNotesCreator(output_handler)
return creator.create_notes_images(
run_id,
topic_index,
variant_index,
poster_image_path,
poster_metadata_path,
source_image_dir,
num_additional_images,
output_filename_template
)
def select_additional_images(
run_id: str,
topic_index: int,
variant_index: int,
poster_metadata_path: str,
source_image_dir: str,
num_additional_images: int,
output_handler: OutputHandler,
2025-04-26 15:53:44 +08:00
output_filename_template: str = "additional_{index}.jpg",
variation_strength: str = "medium",
extra_effects: bool = True
2025-04-26 14:53:54 +08:00
) -> List[str]:
"""
2025-04-26 15:53:44 +08:00
选择未被海报使用的图像作为额外配图并处理为3:4比例
2025-04-26 14:53:54 +08:00
"""
logger.info(f"开始为主题 {topic_index} 变体 {variant_index} 选择额外配图")
# 验证输入
if not os.path.exists(poster_metadata_path):
logger.error(f"海报元数据不存在: {poster_metadata_path}")
return []
# 创建处理器实例
2025-04-26 14:53:54 +08:00
creator = PosterNotesCreator(output_handler)
# 使用优化后的方法处理图像
2025-04-26 14:53:54 +08:00
return creator.create_additional_images(
run_id,
topic_index,
variant_index,
poster_metadata_path,
source_image_dir,
num_additional_images,
2025-04-26 15:53:44 +08:00
output_filename_template,
variation_strength,
extra_effects
2025-04-26 14:53:54 +08:00
)