TravelContentCreator/utils/poster_notes_creator.py

786 lines
30 KiB
Python
Raw Normal View History

2025-04-26 14:53:54 +08:00
import os
import random
import logging
import json
from PIL import Image
import traceback
from typing import List, Tuple, Dict, Any, Optional
from .output_handler import OutputHandler
logger = logging.getLogger(__name__)
class PosterNotesCreator:
"""
处理原始海报作为主图并随机选择额外的图片作为笔记图片
确保选择的笔记图片与海报中使用的图片不重复
"""
def __init__(self, output_handler: OutputHandler):
"""
初始化 PosterNotesCreator
Args:
output_handler: 可选的 OutputHandler 实例用于处理输出
"""
self.output_handler = output_handler
logging.info("PosterNotesCreator 初始化完成")
def create_notes_images(
self,
run_id: str,
topic_index: int,
variant_index: int,
poster_image_path: str,
poster_metadata_path: str,
source_image_dir: str,
num_additional_images: int,
output_filename_template: str = "note_{index}.jpg"
) -> List[str]:
"""
创建笔记图像
Args:
run_id: 运行ID
topic_index: 主题索引
variant_index: 变体索引
poster_image_path: 海报图像路径
poster_metadata_path: 海报元数据路径
source_image_dir: 源图像目录
num_additional_images: 要使用的额外图像数量
output_filename_template: 输出文件名模板
Returns:
List[str]: 保存的笔记图像路径列表
"""
# 检查输入路径是否存在
if not os.path.exists(poster_image_path):
logger.error(f"海报图像不存在: {poster_image_path}")
return []
if not os.path.exists(poster_metadata_path):
logger.error(f"海报元数据不存在: {poster_metadata_path}")
return []
if not os.path.exists(source_image_dir) or not os.path.isdir(source_image_dir):
logger.error(f"源图像目录不存在: {source_image_dir}")
return []
# 从元数据文件中读取已使用的图像信息
try:
with open(poster_metadata_path, 'r', encoding='utf-8') as f:
poster_metadata = json.load(f)
except Exception as e:
logger.error(f"无法读取海报元数据: {e}")
return []
# 获取已经在海报中使用的图像
used_images = []
if 'collage_images' in poster_metadata:
used_images = poster_metadata['collage_images']
logger.info(f"海报中已使用 {len(used_images)} 张图像: {', '.join(used_images)}")
# 列出源目录中的所有图像文件
image_extensions = ('.jpg', '.jpeg', '.png', '.bmp')
available_images = [
f for f in os.listdir(source_image_dir)
if os.path.isfile(os.path.join(source_image_dir, f)) and
f.lower().endswith(image_extensions)
]
if not available_images:
logger.error(f"源目录中没有找到图像: {source_image_dir}")
return []
logger.info(f"源目录中找到 {len(available_images)} 张图像")
# 过滤掉已经在海报中使用的图像
available_images = [img for img in available_images if img not in used_images]
if not available_images:
logger.warning("所有图像都已在海报中使用,无法创建额外笔记")
return []
logger.info(f"过滤后可用图像数量: {len(available_images)}")
# 如果可用图像少于请求数量,进行警告但继续处理
if len(available_images) < num_additional_images:
logger.warning(
f"可用图像数量 ({len(available_images)}) 少于请求的笔记数量 ({num_additional_images})"
f"将使用所有可用图像"
)
selected_images = available_images
else:
# 随机选择额外图像
selected_images = random.sample(available_images, num_additional_images)
logger.info(f"已选择 {len(selected_images)} 张图像作为笔记")
# 保存选择的笔记图像
saved_paths = []
for i, image_filename in enumerate(selected_images):
try:
# 加载图像
image_path = os.path.join(source_image_dir, image_filename)
image = Image.open(image_path)
# 生成输出文件名
output_filename = output_filename_template.format(index=i+1)
# 创建元数据
note_metadata = {
"original_image": image_filename,
"note_index": i + 1,
"source_dir": source_image_dir,
"associated_poster": os.path.basename(poster_image_path)
}
# 使用输出处理器保存图像
saved_path = self.output_handler.handle_generated_image(
run_id,
topic_index,
variant_index,
'note', # 图像类型为note
image,
output_filename,
note_metadata
)
saved_paths.append(saved_path)
logger.info(f"已保存笔记图像 {i+1}/{len(selected_images)}: {saved_path}")
except Exception as e:
logger.error(f"处理图像时出错 '{image_filename}': {e}")
return saved_paths
def create_additional_images(
self,
run_id: str,
topic_index: int,
variant_index: int,
poster_metadata_path: str,
source_image_dir: str,
num_additional_images: int = 3,
2025-04-26 15:53:44 +08:00
output_filename_template: str = "additional_{index}.jpg",
variation_strength: str = "medium",
extra_effects: bool = True
2025-04-26 14:53:54 +08:00
) -> List[str]:
"""
2025-04-26 15:53:44 +08:00
选择未被海报使用的图像作为额外配图并处理为3:4比例
2025-04-26 14:53:54 +08:00
Args:
run_id: 运行ID
topic_index: 主题索引
variant_index: 变体索引
poster_metadata_path: 海报元数据路径
source_image_dir: 源图像目录
num_additional_images: 要选择的额外图像数量
output_filename_template: 输出文件名模板
2025-04-26 15:53:44 +08:00
variation_strength: 微调强度 - "low", "medium", "high"
extra_effects: 是否添加额外效果
2025-04-26 14:53:54 +08:00
Returns:
List[str]: 保存的额外配图路径列表
"""
logger.info(f"开始为主题 {topic_index} 变体 {variant_index} 选择额外配图")
# 检查输入路径是否存在
if not os.path.exists(poster_metadata_path):
logger.error(f"海报元数据不存在: {poster_metadata_path}")
return []
if not os.path.exists(source_image_dir) or not os.path.isdir(source_image_dir):
logger.error(f"源图像目录不存在: {source_image_dir}")
return []
# 从元数据文件中读取已使用的图像信息
try:
with open(poster_metadata_path, 'r', encoding='utf-8') as f:
poster_metadata = json.load(f)
except Exception as e:
logger.error(f"无法读取海报元数据: {e}")
return []
# 获取已经在海报中使用的图像
used_images = []
if 'collage_images' in poster_metadata:
used_images = poster_metadata['collage_images']
logger.info(f"海报中已使用 {len(used_images)} 张图像: {', '.join(used_images)}")
# 列出源目录中的所有图像文件
image_extensions = ('.jpg', '.jpeg', '.png', '.bmp')
available_images = [
f for f in os.listdir(source_image_dir)
if os.path.isfile(os.path.join(source_image_dir, f)) and
f.lower().endswith(image_extensions)
]
if not available_images:
logger.error(f"源目录中没有找到图像: {source_image_dir}")
return []
logger.info(f"源目录中找到 {len(available_images)} 张图像")
# 过滤掉已经在海报中使用的图像
available_images = [img for img in available_images if img not in used_images]
if not available_images:
logger.warning("所有图像都已在海报中使用,无法创建额外配图")
return []
logger.info(f"过滤后可用图像数量: {len(available_images)}")
# 如果可用图像少于请求数量,进行警告但继续处理
if len(available_images) < num_additional_images:
logger.warning(
f"可用图像数量 ({len(available_images)}) 少于请求的配图数量 ({num_additional_images})"
f"将使用所有可用图像"
)
selected_images = available_images
else:
2025-04-26 15:53:44 +08:00
# 生成唯一的随机种子基于run_id、topic_index和variant_index
seed_str = f"{run_id}_{topic_index}_{variant_index}"
seed = sum(ord(c) for c in seed_str)
logger.info(f"使用随机种子: {seed},基于: {seed_str}")
random.seed(seed)
2025-04-26 14:53:54 +08:00
# 随机选择额外图像
selected_images = random.sample(available_images, num_additional_images)
2025-04-26 15:53:44 +08:00
# 重置随机种子,不影响其他随机操作
random.seed()
2025-04-26 14:53:54 +08:00
logger.info(f"已选择 {len(selected_images)} 张图像作为额外配图")
2025-04-26 15:53:44 +08:00
logger.info(f"微调强度: {variation_strength}, 额外效果: {'启用' if extra_effects else '禁用'}")
2025-04-26 14:53:54 +08:00
# 保存选择的额外配图
saved_paths = []
for i, image_filename in enumerate(selected_images):
try:
# 加载图像
image_path = os.path.join(source_image_dir, image_filename)
image = Image.open(image_path)
2025-04-26 15:53:44 +08:00
# 处理图像为3:4比例并添加微小变化
# 使用不同的种子确保每个图像的变化各不相同
variation_seed = seed + i if 'seed' in locals() else i + 1
processed_image = self.process_image_to_aspect_ratio(
image,
(3, 4),
add_variation=True,
seed=variation_seed,
variation_strength=variation_strength,
extra_effects=extra_effects
)
2025-04-26 14:53:54 +08:00
# 生成输出文件名
output_filename = output_filename_template.format(index=i+1)
# 创建元数据
additional_metadata = {
"original_image": image_filename,
"additional_index": i + 1,
"source_dir": source_image_dir,
2025-04-26 15:53:44 +08:00
"is_additional_image": True,
"processed": True,
"aspect_ratio": "3:4",
"variation_applied": True,
"variation_strength": variation_strength,
"extra_effects": extra_effects
2025-04-26 14:53:54 +08:00
}
# 使用输出处理器保存图像
saved_path = self.output_handler.handle_generated_image(
run_id,
topic_index,
variant_index,
'additional', # 图像类型为additional
2025-04-26 15:53:44 +08:00
processed_image,
2025-04-26 14:53:54 +08:00
output_filename,
additional_metadata
)
saved_paths.append(saved_path)
logger.info(f"已保存额外配图 {i+1}/{len(selected_images)}: {saved_path}")
except Exception as e:
logger.error(f"处理图像时出错 '{image_filename}': {e}")
return saved_paths
2025-04-26 15:53:44 +08:00
def process_image_to_aspect_ratio(
self,
image: Image.Image,
target_ratio: Tuple[int, int],
add_variation: bool = True,
seed: int = None,
variation_strength: str = "medium", # 新参数: 微调强度 - "low", "medium", "high"
extra_effects: bool = True # 新参数: 是否添加额外效果
) -> Image.Image:
"""
处理图像到指定的宽高比并添加微小变化
Args:
image: 原始图像
target_ratio: 目标宽高比(3, 4)表示3:4的比例
add_variation: 是否添加微小变化以避免哈希检测
seed: 随机种子用于确保变化的可重复性
variation_strength: 微调强度 - "low", "medium", "high"
extra_effects: 是否添加额外效果噪点微透视变换等
Returns:
Image.Image: 处理后的图像
"""
# 如果指定了种子,设置随机种子
if seed is not None:
random.seed(seed)
# 根据微调强度设置参数范围
if variation_strength == "low":
brightness_range = (-0.03, 0.03)
contrast_range = (-0.03, 0.03)
saturation_range = (-0.03, 0.03)
hue_range = (-0.01, 0.01)
max_crop_px = 3
max_rotation = 0.5
noise_intensity = 0.01
border_size_range = (0, 2)
elif variation_strength == "high":
brightness_range = (-0.08, 0.08)
contrast_range = (-0.08, 0.08)
saturation_range = (-0.08, 0.08)
hue_range = (-0.02, 0.02)
max_crop_px = 8
max_rotation = 2.0
noise_intensity = 0.03
border_size_range = (0, 4)
else: # medium (默认)
brightness_range = (-0.05, 0.05)
contrast_range = (-0.05, 0.05)
saturation_range = (-0.05, 0.05)
hue_range = (-0.015, 0.015)
max_crop_px = 5
max_rotation = 1.0
noise_intensity = 0.02
border_size_range = (0, 3)
width, height = image.size
current_ratio = width / height
target_ratio_value = target_ratio[0] / target_ratio[1]
# 不再直接调整尺寸而是先resize然后进行轻微裁剪
# 第一步:先调整大小,使较短边符合目标尺寸
if current_ratio > target_ratio_value: # 图片较宽
# 先根据高度调整大小
new_height = 1200 # 可以根据需要调整目标尺寸
new_width = int(new_height * current_ratio)
else: # 图片较高
# 先根据宽度调整大小
new_width = 900 # 可以根据需要调整目标尺寸
new_height = int(new_width / current_ratio)
# 调整尺寸
resized_image = image.resize((new_width, new_height), Image.LANCZOS)
# 第二步:计算要裁剪的区域,使最终比例为目标比例
resized_width, resized_height = resized_image.size
if resized_width / resized_height > target_ratio_value: # 调整后的图片较宽
# 需要裁剪宽度
crop_width = int(resized_height * target_ratio_value)
# 添加微小变化:不完全居中裁剪,而是稍微偏移
if add_variation:
max_offset = max(1, min(20, (resized_width - crop_width) // 5)) # 最大偏移量
offset = random.randint(-max_offset, max_offset)
else:
offset = 0
# 确保裁剪区域在图像内
crop_x1 = (resized_width - crop_width) // 2 + offset
crop_x1 = max(0, min(crop_x1, resized_width - crop_width))
crop_x2 = crop_x1 + crop_width
result = resized_image.crop((crop_x1, 0, crop_x2, resized_height))
else: # 调整后的图片较高
# 需要裁剪高度
crop_height = int(resized_width / target_ratio_value)
# 添加微小变化:不完全居中裁剪,而是稍微偏移
if add_variation:
max_offset = max(1, min(20, (resized_height - crop_height) // 5)) # 最大偏移量
offset = random.randint(-max_offset, max_offset)
else:
offset = 0
# 确保裁剪区域在图像内
crop_y1 = (resized_height - crop_height) // 2 + offset
crop_y1 = max(0, min(crop_y1, resized_height - crop_height))
crop_y2 = crop_y1 + crop_height
result = resized_image.crop((0, crop_y1, resized_width, crop_y2))
# 如果需要添加微小变化
if add_variation:
# 转换为RGB模式进行处理
if result.mode != 'RGB':
processed_image = result.convert('RGB')
else:
processed_image = result.copy()
# 1. 微调亮度
brightness_factor = 1.0 + random.uniform(*brightness_range)
processed_image = self._adjust_brightness(processed_image, brightness_factor)
# 2. 微调对比度
contrast_factor = 1.0 + random.uniform(*contrast_range)
processed_image = self._adjust_contrast(processed_image, contrast_factor)
# 3. 微调饱和度
saturation_factor = 1.0 + random.uniform(*saturation_range)
processed_image = self._adjust_saturation(processed_image, saturation_factor)
# 4. 微小裁剪调整
crop_px = random.randint(0, max_crop_px)
if crop_px > 0:
width, height = processed_image.size
processed_image = processed_image.crop((crop_px, crop_px, width-crop_px, height-crop_px))
processed_image = processed_image.resize((width, height), Image.LANCZOS)
# 5. 微小旋转
rotation_angle = random.uniform(-max_rotation, max_rotation)
processed_image = processed_image.rotate(rotation_angle, resample=Image.BICUBIC, expand=False)
# 6. 额外效果 (如果启用)
if extra_effects:
# 6.1 添加微弱噪点
processed_image = self._add_noise(processed_image, intensity=noise_intensity)
# 6.2 微小色相调整
processed_image = self._adjust_hue(processed_image, shift=random.uniform(*hue_range))
# 6.3 随机边缘微调 - 随机添加1-3像素的边缘
border_size = random.randint(*border_size_range)
if border_size > 0:
processed_image = self._add_border_and_crop(processed_image, border_size)
# 6.4 随机进行细微的锐化或模糊处理
if random.random() > 0.5:
processed_image = self._slight_sharpen(processed_image)
else:
processed_image = self._slight_blur(processed_image)
# 重置随机种子,避免影响其他操作
if seed is not None:
random.seed()
return processed_image
else:
# 重置随机种子,避免影响其他操作
if seed is not None:
random.seed()
return result
def _adjust_hue(self, image: Image.Image, shift: float) -> Image.Image:
"""调整图像色相"""
if shift == 0.0:
return image
try:
# 使用PIL的色相调整
from PIL import ImageEnhance, ImageOps
import colorsys
# 获取像素数据
data = list(image.getdata())
new_data = []
for pixel in data:
r, g, b = pixel[:3]
# 转换为HSV
h, s, v = colorsys.rgb_to_hsv(r/255, g/255, b/255)
# 调整色相 (H 是 0-1 的值)
h = (h + shift) % 1.0
# 转回RGB
r, g, b = colorsys.hsv_to_rgb(h, s, v)
r = int(r * 255)
g = int(g * 255)
b = int(b * 255)
if len(pixel) > 3: # 如果有alpha通道
new_data.append((r, g, b, pixel[3]))
else:
new_data.append((r, g, b))
result = Image.new(image.mode, image.size)
result.putdata(new_data)
return result
except (ImportError, AttributeError):
# 如果无法使用上述方法,返回原图
return image
def _add_noise(self, image: Image.Image, intensity: float = 0.02) -> Image.Image:
"""添加微弱噪点intensity控制噪点强度(0-1)"""
if intensity <= 0:
return image
# 获取像素数据
data = list(image.getdata())
new_data = []
for pixel in data:
r, g, b = pixel[:3]
# 添加随机噪点
noise_r = random.randint(-int(intensity * 255), int(intensity * 255))
noise_g = random.randint(-int(intensity * 255), int(intensity * 255))
noise_b = random.randint(-int(intensity * 255), int(intensity * 255))
r = max(0, min(255, r + noise_r))
g = max(0, min(255, g + noise_g))
b = max(0, min(255, b + noise_b))
if len(pixel) > 3: # 如果有alpha通道
new_data.append((r, g, b, pixel[3]))
else:
new_data.append((r, g, b))
result = Image.new(image.mode, image.size)
result.putdata(new_data)
return result
def _add_border_and_crop(self, image: Image.Image, border_size: int) -> Image.Image:
"""添加边框然后裁剪回原尺寸,用于改变边缘像素"""
if border_size <= 0:
return image
width, height = image.size
# 创建略大的画布
border_color = (
random.randint(0, 10),
random.randint(0, 10),
random.randint(0, 10)
)
bordered = Image.new(image.mode, (width + border_size*2, height + border_size*2), border_color)
bordered.paste(image, (border_size, border_size))
# 随机裁剪回原尺寸
offset_x = random.randint(0, border_size*2)
offset_y = random.randint(0, border_size*2)
result = bordered.crop((offset_x, offset_y, offset_x + width, offset_y + height))
return result
def _slight_sharpen(self, image: Image.Image) -> Image.Image:
"""轻微锐化图像"""
try:
from PIL import ImageEnhance
enhancer = ImageEnhance.Sharpness(image)
return enhancer.enhance(1.2) # 轻微锐化1.0是原始锐度
except (ImportError, AttributeError):
return image
def _slight_blur(self, image: Image.Image) -> Image.Image:
"""轻微模糊图像"""
try:
from PIL import ImageFilter
return image.filter(ImageFilter.GaussianBlur(radius=0.5))
except (ImportError, AttributeError):
return image
def _adjust_brightness(self, image: Image.Image, factor: float) -> Image.Image:
"""调整图像亮度"""
if factor == 1.0:
return image
data = list(image.getdata())
new_data = []
for pixel in data:
r, g, b = pixel[:3]
r = min(255, max(0, int(r * factor)))
g = min(255, max(0, int(g * factor)))
b = min(255, max(0, int(b * factor)))
if len(pixel) > 3: # 如果有alpha通道
new_data.append((r, g, b, pixel[3]))
else:
new_data.append((r, g, b))
result = Image.new(image.mode, image.size)
result.putdata(new_data)
return result
def _adjust_contrast(self, image: Image.Image, factor: float) -> Image.Image:
"""调整图像对比度"""
if factor == 1.0:
return image
data = list(image.getdata())
new_data = []
# 计算平均亮度
avg_r, avg_g, avg_b = 0, 0, 0
count = 0
for pixel in data:
r, g, b = pixel[:3]
avg_r += r
avg_g += g
avg_b += b
count += 1
if count > 0:
avg_r //= count
avg_g //= count
avg_b //= count
# 调整对比度
for pixel in data:
r, g, b = pixel[:3]
r = min(255, max(0, int(avg_r + (r - avg_r) * factor)))
g = min(255, max(0, int(avg_g + (g - avg_g) * factor)))
b = min(255, max(0, int(avg_b + (b - avg_b) * factor)))
if len(pixel) > 3: # 如果有alpha通道
new_data.append((r, g, b, pixel[3]))
else:
new_data.append((r, g, b))
result = Image.new(image.mode, image.size)
result.putdata(new_data)
return result
def _adjust_saturation(self, image: Image.Image, factor: float) -> Image.Image:
"""调整图像饱和度"""
if factor == 1.0:
return image
# 转换为HSV色彩空间调整S通道然后转回RGB
try:
# 使用内部方法,效率更高
from PIL import ImageEnhance
enhancer = ImageEnhance.Color(image)
return enhancer.enhance(factor)
except ImportError:
# 如果PIL没有提供相关功能使用自定义实现
data = list(image.getdata())
new_data = []
for pixel in data:
r, g, b = pixel[:3]
# 计算灰度值
gray = (r + g + b) // 3
# 调整饱和度
r = min(255, max(0, int(gray + (r - gray) * factor)))
g = min(255, max(0, int(gray + (g - gray) * factor)))
b = min(255, max(0, int(gray + (b - gray) * factor)))
if len(pixel) > 3: # 如果有alpha通道
new_data.append((r, g, b, pixel[3]))
else:
new_data.append((r, g, b))
result = Image.new(image.mode, image.size)
result.putdata(new_data)
return result
2025-04-26 14:53:54 +08:00
def process_poster_for_notes(
run_id: str,
topic_index: int,
variant_index: int,
poster_image_path: str,
poster_metadata_path: str,
source_image_dir: str,
num_additional_images: int,
output_handler: OutputHandler,
output_filename_template: str = "note_{index}.jpg"
) -> List[str]:
"""
处理海报并创建笔记图像
Args:
run_id: 运行ID
topic_index: 主题索引
variant_index: 变体索引
poster_image_path: 海报图像路径
poster_metadata_path: 海报元数据路径
source_image_dir: 源图像目录
num_additional_images: 要使用的额外图像数量
output_handler: 输出处理器
output_filename_template: 输出文件名模板
Returns:
List[str]: 保存的笔记图像路径列表
"""
logger.info(f"开始为海报创建笔记图像: {poster_image_path}")
# 验证输入
if not os.path.exists(poster_image_path):
logger.error(f"海报图像不存在: {poster_image_path}")
return []
# 创建处理器实例并处理
creator = PosterNotesCreator(output_handler)
return creator.create_notes_images(
run_id,
topic_index,
variant_index,
poster_image_path,
poster_metadata_path,
source_image_dir,
num_additional_images,
output_filename_template
)
def select_additional_images(
run_id: str,
topic_index: int,
variant_index: int,
poster_metadata_path: str,
source_image_dir: str,
num_additional_images: int,
output_handler: OutputHandler,
2025-04-26 15:53:44 +08:00
output_filename_template: str = "additional_{index}.jpg",
variation_strength: str = "medium",
extra_effects: bool = True
2025-04-26 14:53:54 +08:00
) -> List[str]:
"""
2025-04-26 15:53:44 +08:00
选择未被海报使用的图像作为额外配图并处理为3:4比例
2025-04-26 14:53:54 +08:00
Args:
run_id: 运行ID
topic_index: 主题索引
variant_index: 变体索引
poster_metadata_path: 海报元数据路径
source_image_dir: 源图像目录
num_additional_images: 要选择的额外图像数量
output_handler: 输出处理器
output_filename_template: 输出文件名模板
2025-04-26 15:53:44 +08:00
variation_strength: 微调强度 - "low", "medium", "high"
extra_effects: 是否添加额外效果
2025-04-26 14:53:54 +08:00
Returns:
List[str]: 保存的额外配图路径列表
"""
logger.info(f"开始为主题 {topic_index} 变体 {variant_index} 选择额外配图")
# 验证输入
if not os.path.exists(poster_metadata_path):
logger.error(f"海报元数据不存在: {poster_metadata_path}")
return []
# 创建处理器实例并处理
creator = PosterNotesCreator(output_handler)
return creator.create_additional_images(
run_id,
topic_index,
variant_index,
poster_metadata_path,
source_image_dir,
num_additional_images,
2025-04-26 15:53:44 +08:00
output_filename_template,
variation_strength,
extra_effects
2025-04-26 14:53:54 +08:00
)