TravelContentCreator/utils/poster_notes_creator.py

786 lines
30 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import random
import logging
import json
from PIL import Image
import traceback
from typing import List, Tuple, Dict, Any, Optional
from .output_handler import OutputHandler
logger = logging.getLogger(__name__)
class PosterNotesCreator:
"""
处理原始海报作为主图,并随机选择额外的图片作为笔记图片。
确保选择的笔记图片与海报中使用的图片不重复。
"""
def __init__(self, output_handler: OutputHandler):
"""
初始化 PosterNotesCreator
Args:
output_handler: 可选的 OutputHandler 实例,用于处理输出
"""
self.output_handler = output_handler
logging.info("PosterNotesCreator 初始化完成")
def create_notes_images(
self,
run_id: str,
topic_index: int,
variant_index: int,
poster_image_path: str,
poster_metadata_path: str,
source_image_dir: str,
num_additional_images: int,
output_filename_template: str = "note_{index}.jpg"
) -> List[str]:
"""
创建笔记图像
Args:
run_id: 运行ID
topic_index: 主题索引
variant_index: 变体索引
poster_image_path: 海报图像路径
poster_metadata_path: 海报元数据路径
source_image_dir: 源图像目录
num_additional_images: 要使用的额外图像数量
output_filename_template: 输出文件名模板
Returns:
List[str]: 保存的笔记图像路径列表
"""
# 检查输入路径是否存在
if not os.path.exists(poster_image_path):
logger.error(f"海报图像不存在: {poster_image_path}")
return []
if not os.path.exists(poster_metadata_path):
logger.error(f"海报元数据不存在: {poster_metadata_path}")
return []
if not os.path.exists(source_image_dir) or not os.path.isdir(source_image_dir):
logger.error(f"源图像目录不存在: {source_image_dir}")
return []
# 从元数据文件中读取已使用的图像信息
try:
with open(poster_metadata_path, 'r', encoding='utf-8') as f:
poster_metadata = json.load(f)
except Exception as e:
logger.error(f"无法读取海报元数据: {e}")
return []
# 获取已经在海报中使用的图像
used_images = []
if 'collage_images' in poster_metadata:
used_images = poster_metadata['collage_images']
logger.info(f"海报中已使用 {len(used_images)} 张图像: {', '.join(used_images)}")
# 列出源目录中的所有图像文件
image_extensions = ('.jpg', '.jpeg', '.png', '.bmp')
available_images = [
f for f in os.listdir(source_image_dir)
if os.path.isfile(os.path.join(source_image_dir, f)) and
f.lower().endswith(image_extensions)
]
if not available_images:
logger.error(f"源目录中没有找到图像: {source_image_dir}")
return []
logger.info(f"源目录中找到 {len(available_images)} 张图像")
# 过滤掉已经在海报中使用的图像
available_images = [img for img in available_images if img not in used_images]
if not available_images:
logger.warning("所有图像都已在海报中使用,无法创建额外笔记")
return []
logger.info(f"过滤后可用图像数量: {len(available_images)}")
# 如果可用图像少于请求数量,进行警告但继续处理
if len(available_images) < num_additional_images:
logger.warning(
f"可用图像数量 ({len(available_images)}) 少于请求的笔记数量 ({num_additional_images})"
f"将使用所有可用图像"
)
selected_images = available_images
else:
# 随机选择额外图像
selected_images = random.sample(available_images, num_additional_images)
logger.info(f"已选择 {len(selected_images)} 张图像作为笔记")
# 保存选择的笔记图像
saved_paths = []
for i, image_filename in enumerate(selected_images):
try:
# 加载图像
image_path = os.path.join(source_image_dir, image_filename)
image = Image.open(image_path)
# 生成输出文件名
output_filename = output_filename_template.format(index=i+1)
# 创建元数据
note_metadata = {
"original_image": image_filename,
"note_index": i + 1,
"source_dir": source_image_dir,
"associated_poster": os.path.basename(poster_image_path)
}
# 使用输出处理器保存图像
saved_path = self.output_handler.handle_generated_image(
run_id,
topic_index,
variant_index,
'note', # 图像类型为note
image,
output_filename,
note_metadata
)
saved_paths.append(saved_path)
logger.info(f"已保存笔记图像 {i+1}/{len(selected_images)}: {saved_path}")
except Exception as e:
logger.error(f"处理图像时出错 '{image_filename}': {e}")
return saved_paths
def create_additional_images(
self,
run_id: str,
topic_index: int,
variant_index: int,
poster_metadata_path: str,
source_image_dir: str,
num_additional_images: int = 3,
output_filename_template: str = "additional_{index}.jpg",
variation_strength: str = "medium",
extra_effects: bool = True
) -> List[str]:
"""
选择未被海报使用的图像作为额外配图并处理为3:4比例
Args:
run_id: 运行ID
topic_index: 主题索引
variant_index: 变体索引
poster_metadata_path: 海报元数据路径
source_image_dir: 源图像目录
num_additional_images: 要选择的额外图像数量
output_filename_template: 输出文件名模板
variation_strength: 微调强度 - "low", "medium", "high"
extra_effects: 是否添加额外效果
Returns:
List[str]: 保存的额外配图路径列表
"""
logger.info(f"开始为主题 {topic_index} 变体 {variant_index} 选择额外配图")
# 检查输入路径是否存在
if not os.path.exists(poster_metadata_path):
logger.error(f"海报元数据不存在: {poster_metadata_path}")
return []
if not os.path.exists(source_image_dir) or not os.path.isdir(source_image_dir):
logger.error(f"源图像目录不存在: {source_image_dir}")
return []
# 从元数据文件中读取已使用的图像信息
try:
with open(poster_metadata_path, 'r', encoding='utf-8') as f:
poster_metadata = json.load(f)
except Exception as e:
logger.error(f"无法读取海报元数据: {e}")
return []
# 获取已经在海报中使用的图像
used_images = []
if 'collage_images' in poster_metadata:
used_images = poster_metadata['collage_images']
logger.info(f"海报中已使用 {len(used_images)} 张图像: {', '.join(used_images)}")
# 列出源目录中的所有图像文件
image_extensions = ('.jpg', '.jpeg', '.png', '.bmp')
available_images = [
f for f in os.listdir(source_image_dir)
if os.path.isfile(os.path.join(source_image_dir, f)) and
f.lower().endswith(image_extensions)
]
if not available_images:
logger.error(f"源目录中没有找到图像: {source_image_dir}")
return []
logger.info(f"源目录中找到 {len(available_images)} 张图像")
# 过滤掉已经在海报中使用的图像
available_images = [img for img in available_images if img not in used_images]
if not available_images:
logger.warning("所有图像都已在海报中使用,无法创建额外配图")
return []
logger.info(f"过滤后可用图像数量: {len(available_images)}")
# 如果可用图像少于请求数量,进行警告但继续处理
if len(available_images) < num_additional_images:
logger.warning(
f"可用图像数量 ({len(available_images)}) 少于请求的配图数量 ({num_additional_images})"
f"将使用所有可用图像"
)
selected_images = available_images
else:
# 生成唯一的随机种子基于run_id、topic_index和variant_index
seed_str = f"{run_id}_{topic_index}_{variant_index}"
seed = sum(ord(c) for c in seed_str)
logger.info(f"使用随机种子: {seed},基于: {seed_str}")
random.seed(seed)
# 随机选择额外图像
selected_images = random.sample(available_images, num_additional_images)
# 重置随机种子,不影响其他随机操作
random.seed()
logger.info(f"已选择 {len(selected_images)} 张图像作为额外配图")
logger.info(f"微调强度: {variation_strength}, 额外效果: {'启用' if extra_effects else '禁用'}")
# 保存选择的额外配图
saved_paths = []
for i, image_filename in enumerate(selected_images):
try:
# 加载图像
image_path = os.path.join(source_image_dir, image_filename)
image = Image.open(image_path)
# 处理图像为3:4比例并添加微小变化
# 使用不同的种子确保每个图像的变化各不相同
variation_seed = seed + i if 'seed' in locals() else i + 1
processed_image = self.process_image_to_aspect_ratio(
image,
(3, 4),
add_variation=True,
seed=variation_seed,
variation_strength=variation_strength,
extra_effects=extra_effects
)
# 生成输出文件名
output_filename = output_filename_template.format(index=i+1)
# 创建元数据
additional_metadata = {
"original_image": image_filename,
"additional_index": i + 1,
"source_dir": source_image_dir,
"is_additional_image": True,
"processed": True,
"aspect_ratio": "3:4",
"variation_applied": True,
"variation_strength": variation_strength,
"extra_effects": extra_effects
}
# 使用输出处理器保存图像
saved_path = self.output_handler.handle_generated_image(
run_id,
topic_index,
variant_index,
'additional', # 图像类型为additional
processed_image,
output_filename,
additional_metadata
)
saved_paths.append(saved_path)
logger.info(f"已保存额外配图 {i+1}/{len(selected_images)}: {saved_path}")
except Exception as e:
logger.error(f"处理图像时出错 '{image_filename}': {e}")
return saved_paths
def process_image_to_aspect_ratio(
self,
image: Image.Image,
target_ratio: Tuple[int, int],
add_variation: bool = True,
seed: int = None,
variation_strength: str = "medium", # 新参数: 微调强度 - "low", "medium", "high"
extra_effects: bool = True # 新参数: 是否添加额外效果
) -> Image.Image:
"""
处理图像到指定的宽高比,并添加微小变化
Args:
image: 原始图像
target_ratio: 目标宽高比,如(3, 4)表示3:4的比例
add_variation: 是否添加微小变化以避免哈希检测
seed: 随机种子,用于确保变化的可重复性
variation_strength: 微调强度 - "low", "medium", "high"
extra_effects: 是否添加额外效果(噪点、微透视变换等)
Returns:
Image.Image: 处理后的图像
"""
# 如果指定了种子,设置随机种子
if seed is not None:
random.seed(seed)
# 根据微调强度设置参数范围
if variation_strength == "low":
brightness_range = (-0.03, 0.03)
contrast_range = (-0.03, 0.03)
saturation_range = (-0.03, 0.03)
hue_range = (-0.01, 0.01)
max_crop_px = 3
max_rotation = 0.5
noise_intensity = 0.01
border_size_range = (0, 2)
elif variation_strength == "high":
brightness_range = (-0.08, 0.08)
contrast_range = (-0.08, 0.08)
saturation_range = (-0.08, 0.08)
hue_range = (-0.02, 0.02)
max_crop_px = 8
max_rotation = 2.0
noise_intensity = 0.03
border_size_range = (0, 4)
else: # medium (默认)
brightness_range = (-0.05, 0.05)
contrast_range = (-0.05, 0.05)
saturation_range = (-0.05, 0.05)
hue_range = (-0.015, 0.015)
max_crop_px = 5
max_rotation = 1.0
noise_intensity = 0.02
border_size_range = (0, 3)
width, height = image.size
current_ratio = width / height
target_ratio_value = target_ratio[0] / target_ratio[1]
# 不再直接调整尺寸而是先resize然后进行轻微裁剪
# 第一步:先调整大小,使较短边符合目标尺寸
if current_ratio > target_ratio_value: # 图片较宽
# 先根据高度调整大小
new_height = 1200 # 可以根据需要调整目标尺寸
new_width = int(new_height * current_ratio)
else: # 图片较高
# 先根据宽度调整大小
new_width = 900 # 可以根据需要调整目标尺寸
new_height = int(new_width / current_ratio)
# 调整尺寸
resized_image = image.resize((new_width, new_height), Image.LANCZOS)
# 第二步:计算要裁剪的区域,使最终比例为目标比例
resized_width, resized_height = resized_image.size
if resized_width / resized_height > target_ratio_value: # 调整后的图片较宽
# 需要裁剪宽度
crop_width = int(resized_height * target_ratio_value)
# 添加微小变化:不完全居中裁剪,而是稍微偏移
if add_variation:
max_offset = max(1, min(20, (resized_width - crop_width) // 5)) # 最大偏移量
offset = random.randint(-max_offset, max_offset)
else:
offset = 0
# 确保裁剪区域在图像内
crop_x1 = (resized_width - crop_width) // 2 + offset
crop_x1 = max(0, min(crop_x1, resized_width - crop_width))
crop_x2 = crop_x1 + crop_width
result = resized_image.crop((crop_x1, 0, crop_x2, resized_height))
else: # 调整后的图片较高
# 需要裁剪高度
crop_height = int(resized_width / target_ratio_value)
# 添加微小变化:不完全居中裁剪,而是稍微偏移
if add_variation:
max_offset = max(1, min(20, (resized_height - crop_height) // 5)) # 最大偏移量
offset = random.randint(-max_offset, max_offset)
else:
offset = 0
# 确保裁剪区域在图像内
crop_y1 = (resized_height - crop_height) // 2 + offset
crop_y1 = max(0, min(crop_y1, resized_height - crop_height))
crop_y2 = crop_y1 + crop_height
result = resized_image.crop((0, crop_y1, resized_width, crop_y2))
# 如果需要添加微小变化
if add_variation:
# 转换为RGB模式进行处理
if result.mode != 'RGB':
processed_image = result.convert('RGB')
else:
processed_image = result.copy()
# 1. 微调亮度
brightness_factor = 1.0 + random.uniform(*brightness_range)
processed_image = self._adjust_brightness(processed_image, brightness_factor)
# 2. 微调对比度
contrast_factor = 1.0 + random.uniform(*contrast_range)
processed_image = self._adjust_contrast(processed_image, contrast_factor)
# 3. 微调饱和度
saturation_factor = 1.0 + random.uniform(*saturation_range)
processed_image = self._adjust_saturation(processed_image, saturation_factor)
# 4. 微小裁剪调整
crop_px = random.randint(0, max_crop_px)
if crop_px > 0:
width, height = processed_image.size
processed_image = processed_image.crop((crop_px, crop_px, width-crop_px, height-crop_px))
processed_image = processed_image.resize((width, height), Image.LANCZOS)
# 5. 微小旋转
rotation_angle = random.uniform(-max_rotation, max_rotation)
processed_image = processed_image.rotate(rotation_angle, resample=Image.BICUBIC, expand=False)
# 6. 额外效果 (如果启用)
if extra_effects:
# 6.1 添加微弱噪点
processed_image = self._add_noise(processed_image, intensity=noise_intensity)
# 6.2 微小色相调整
processed_image = self._adjust_hue(processed_image, shift=random.uniform(*hue_range))
# 6.3 随机边缘微调 - 随机添加1-3像素的边缘
border_size = random.randint(*border_size_range)
if border_size > 0:
processed_image = self._add_border_and_crop(processed_image, border_size)
# 6.4 随机进行细微的锐化或模糊处理
if random.random() > 0.5:
processed_image = self._slight_sharpen(processed_image)
else:
processed_image = self._slight_blur(processed_image)
# 重置随机种子,避免影响其他操作
if seed is not None:
random.seed()
return processed_image
else:
# 重置随机种子,避免影响其他操作
if seed is not None:
random.seed()
return result
def _adjust_hue(self, image: Image.Image, shift: float) -> Image.Image:
"""调整图像色相"""
if shift == 0.0:
return image
try:
# 使用PIL的色相调整
from PIL import ImageEnhance, ImageOps
import colorsys
# 获取像素数据
data = list(image.getdata())
new_data = []
for pixel in data:
r, g, b = pixel[:3]
# 转换为HSV
h, s, v = colorsys.rgb_to_hsv(r/255, g/255, b/255)
# 调整色相 (H 是 0-1 的值)
h = (h + shift) % 1.0
# 转回RGB
r, g, b = colorsys.hsv_to_rgb(h, s, v)
r = int(r * 255)
g = int(g * 255)
b = int(b * 255)
if len(pixel) > 3: # 如果有alpha通道
new_data.append((r, g, b, pixel[3]))
else:
new_data.append((r, g, b))
result = Image.new(image.mode, image.size)
result.putdata(new_data)
return result
except (ImportError, AttributeError):
# 如果无法使用上述方法,返回原图
return image
def _add_noise(self, image: Image.Image, intensity: float = 0.02) -> Image.Image:
"""添加微弱噪点intensity控制噪点强度(0-1)"""
if intensity <= 0:
return image
# 获取像素数据
data = list(image.getdata())
new_data = []
for pixel in data:
r, g, b = pixel[:3]
# 添加随机噪点
noise_r = random.randint(-int(intensity * 255), int(intensity * 255))
noise_g = random.randint(-int(intensity * 255), int(intensity * 255))
noise_b = random.randint(-int(intensity * 255), int(intensity * 255))
r = max(0, min(255, r + noise_r))
g = max(0, min(255, g + noise_g))
b = max(0, min(255, b + noise_b))
if len(pixel) > 3: # 如果有alpha通道
new_data.append((r, g, b, pixel[3]))
else:
new_data.append((r, g, b))
result = Image.new(image.mode, image.size)
result.putdata(new_data)
return result
def _add_border_and_crop(self, image: Image.Image, border_size: int) -> Image.Image:
"""添加边框然后裁剪回原尺寸,用于改变边缘像素"""
if border_size <= 0:
return image
width, height = image.size
# 创建略大的画布
border_color = (
random.randint(0, 10),
random.randint(0, 10),
random.randint(0, 10)
)
bordered = Image.new(image.mode, (width + border_size*2, height + border_size*2), border_color)
bordered.paste(image, (border_size, border_size))
# 随机裁剪回原尺寸
offset_x = random.randint(0, border_size*2)
offset_y = random.randint(0, border_size*2)
result = bordered.crop((offset_x, offset_y, offset_x + width, offset_y + height))
return result
def _slight_sharpen(self, image: Image.Image) -> Image.Image:
"""轻微锐化图像"""
try:
from PIL import ImageEnhance
enhancer = ImageEnhance.Sharpness(image)
return enhancer.enhance(1.2) # 轻微锐化1.0是原始锐度
except (ImportError, AttributeError):
return image
def _slight_blur(self, image: Image.Image) -> Image.Image:
"""轻微模糊图像"""
try:
from PIL import ImageFilter
return image.filter(ImageFilter.GaussianBlur(radius=0.5))
except (ImportError, AttributeError):
return image
def _adjust_brightness(self, image: Image.Image, factor: float) -> Image.Image:
"""调整图像亮度"""
if factor == 1.0:
return image
data = list(image.getdata())
new_data = []
for pixel in data:
r, g, b = pixel[:3]
r = min(255, max(0, int(r * factor)))
g = min(255, max(0, int(g * factor)))
b = min(255, max(0, int(b * factor)))
if len(pixel) > 3: # 如果有alpha通道
new_data.append((r, g, b, pixel[3]))
else:
new_data.append((r, g, b))
result = Image.new(image.mode, image.size)
result.putdata(new_data)
return result
def _adjust_contrast(self, image: Image.Image, factor: float) -> Image.Image:
"""调整图像对比度"""
if factor == 1.0:
return image
data = list(image.getdata())
new_data = []
# 计算平均亮度
avg_r, avg_g, avg_b = 0, 0, 0
count = 0
for pixel in data:
r, g, b = pixel[:3]
avg_r += r
avg_g += g
avg_b += b
count += 1
if count > 0:
avg_r //= count
avg_g //= count
avg_b //= count
# 调整对比度
for pixel in data:
r, g, b = pixel[:3]
r = min(255, max(0, int(avg_r + (r - avg_r) * factor)))
g = min(255, max(0, int(avg_g + (g - avg_g) * factor)))
b = min(255, max(0, int(avg_b + (b - avg_b) * factor)))
if len(pixel) > 3: # 如果有alpha通道
new_data.append((r, g, b, pixel[3]))
else:
new_data.append((r, g, b))
result = Image.new(image.mode, image.size)
result.putdata(new_data)
return result
def _adjust_saturation(self, image: Image.Image, factor: float) -> Image.Image:
"""调整图像饱和度"""
if factor == 1.0:
return image
# 转换为HSV色彩空间调整S通道然后转回RGB
try:
# 使用内部方法,效率更高
from PIL import ImageEnhance
enhancer = ImageEnhance.Color(image)
return enhancer.enhance(factor)
except ImportError:
# 如果PIL没有提供相关功能使用自定义实现
data = list(image.getdata())
new_data = []
for pixel in data:
r, g, b = pixel[:3]
# 计算灰度值
gray = (r + g + b) // 3
# 调整饱和度
r = min(255, max(0, int(gray + (r - gray) * factor)))
g = min(255, max(0, int(gray + (g - gray) * factor)))
b = min(255, max(0, int(gray + (b - gray) * factor)))
if len(pixel) > 3: # 如果有alpha通道
new_data.append((r, g, b, pixel[3]))
else:
new_data.append((r, g, b))
result = Image.new(image.mode, image.size)
result.putdata(new_data)
return result
def process_poster_for_notes(
run_id: str,
topic_index: int,
variant_index: int,
poster_image_path: str,
poster_metadata_path: str,
source_image_dir: str,
num_additional_images: int,
output_handler: OutputHandler,
output_filename_template: str = "note_{index}.jpg"
) -> List[str]:
"""
处理海报并创建笔记图像
Args:
run_id: 运行ID
topic_index: 主题索引
variant_index: 变体索引
poster_image_path: 海报图像路径
poster_metadata_path: 海报元数据路径
source_image_dir: 源图像目录
num_additional_images: 要使用的额外图像数量
output_handler: 输出处理器
output_filename_template: 输出文件名模板
Returns:
List[str]: 保存的笔记图像路径列表
"""
logger.info(f"开始为海报创建笔记图像: {poster_image_path}")
# 验证输入
if not os.path.exists(poster_image_path):
logger.error(f"海报图像不存在: {poster_image_path}")
return []
# 创建处理器实例并处理
creator = PosterNotesCreator(output_handler)
return creator.create_notes_images(
run_id,
topic_index,
variant_index,
poster_image_path,
poster_metadata_path,
source_image_dir,
num_additional_images,
output_filename_template
)
def select_additional_images(
run_id: str,
topic_index: int,
variant_index: int,
poster_metadata_path: str,
source_image_dir: str,
num_additional_images: int,
output_handler: OutputHandler,
output_filename_template: str = "additional_{index}.jpg",
variation_strength: str = "medium",
extra_effects: bool = True
) -> List[str]:
"""
选择未被海报使用的图像作为额外配图并处理为3:4比例
Args:
run_id: 运行ID
topic_index: 主题索引
variant_index: 变体索引
poster_metadata_path: 海报元数据路径
source_image_dir: 源图像目录
num_additional_images: 要选择的额外图像数量
output_handler: 输出处理器
output_filename_template: 输出文件名模板
variation_strength: 微调强度 - "low", "medium", "high"
extra_effects: 是否添加额外效果
Returns:
List[str]: 保存的额外配图路径列表
"""
logger.info(f"开始为主题 {topic_index} 变体 {variant_index} 选择额外配图")
# 验证输入
if not os.path.exists(poster_metadata_path):
logger.error(f"海报元数据不存在: {poster_metadata_path}")
return []
# 创建处理器实例并处理
creator = PosterNotesCreator(output_handler)
return creator.create_additional_images(
run_id,
topic_index,
variant_index,
poster_metadata_path,
source_image_dir,
num_additional_images,
output_filename_template,
variation_strength,
extra_effects
)