TravelContentCreator/utils/poster_notes_creator.py

972 lines
40 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import random
import logging
import json
from PIL import Image, ImageChops
import traceback
from typing import List, Tuple, Dict, Any, Optional
import concurrent.futures
import numpy as np
from PIL import ImageEnhance, ImageFilter
from .output_handler import OutputHandler
import io
# 尝试导入 scipy如果失败则标记
try:
from scipy.fftpack import dct, idct
SCIPY_AVAILABLE = True
except ImportError:
SCIPY_AVAILABLE = False
dct = None
idct = None
logger = logging.getLogger(__name__)
class PosterNotesCreator:
"""
处理原始海报作为主图,并随机选择额外的图片作为笔记图片。
确保选择的笔记图片与海报中使用的图片不重复。
"""
def __init__(self, output_handler: OutputHandler):
"""
初始化 PosterNotesCreator
Args:
output_handler: 可选的 OutputHandler 实例,用于处理输出
"""
self.output_handler = output_handler
logging.info("PosterNotesCreator 初始化完成")
def create_notes_images(
self,
run_id: str,
topic_index: int,
variant_index: int,
poster_image_path: str,
poster_metadata_path: str,
source_image_dir: str,
num_additional_images: int,
output_filename_template: str = "note_{index}.jpg"
) -> List[str]:
"""
创建笔记图像
Args:
run_id: 运行ID
topic_index: 主题索引
variant_index: 变体索引
poster_image_path: 海报图像路径
poster_metadata_path: 海报元数据路径
source_image_dir: 源图像目录
num_additional_images: 要使用的额外图像数量
output_filename_template: 输出文件名模板
Returns:
List[str]: 保存的笔记图像路径列表
"""
# 检查输入路径是否存在
if not os.path.exists(poster_image_path):
logger.error(f"海报图像不存在: {poster_image_path}")
return []
if not os.path.exists(poster_metadata_path):
logger.error(f"海报元数据不存在: {poster_metadata_path}")
return []
if not os.path.exists(source_image_dir) or not os.path.isdir(source_image_dir):
logger.error(f"源图像目录不存在: {source_image_dir}")
return []
# 从元数据文件中读取已使用的图像信息
try:
with open(poster_metadata_path, 'r', encoding='utf-8') as f:
poster_metadata = json.load(f)
except Exception as e:
logger.error(f"无法读取海报元数据: {e}")
return []
# 获取已经在海报中使用的图像
used_images = []
if 'collage_images' in poster_metadata:
used_images = poster_metadata['collage_images']
logger.info(f"海报中已使用 {len(used_images)} 张图像: {', '.join(used_images)}")
# 列出源目录中的所有图像文件
image_extensions = ('.jpg', '.jpeg', '.png', '.bmp')
available_images = [
f for f in os.listdir(source_image_dir)
if os.path.isfile(os.path.join(source_image_dir, f)) and
f.lower().endswith(image_extensions)
]
if not available_images:
logger.error(f"源目录中没有找到图像: {source_image_dir}")
return []
logger.info(f"源目录中找到 {len(available_images)} 张图像")
# 过滤掉已经在海报中使用的图像
available_images = [img for img in available_images if img not in used_images]
if not available_images:
logger.warning("所有图像都已在海报中使用,无法创建额外笔记")
return []
logger.info(f"过滤后可用图像数量: {len(available_images)}")
# 如果可用图像少于请求数量,进行警告但继续处理
if len(available_images) < num_additional_images:
logger.warning(
f"可用图像数量 ({len(available_images)}) 少于请求的笔记数量 ({num_additional_images})"
f"将使用所有可用图像"
)
selected_images = available_images
else:
# 随机选择额外图像
selected_images = random.sample(available_images, num_additional_images)
logger.info(f"已选择 {len(selected_images)} 张图像作为笔记")
# 保存选择的笔记图像
saved_paths = []
for i, image_filename in enumerate(selected_images):
try:
# 加载图像
image_path = os.path.join(source_image_dir, image_filename)
image = Image.open(image_path)
# 生成输出文件名
output_filename = output_filename_template.format(index=i+1)
# 创建元数据
note_metadata = {
"original_image": image_filename,
"note_index": i + 1,
"source_dir": source_image_dir,
"associated_poster": os.path.basename(poster_image_path)
}
# 使用输出处理器保存图像
saved_path = self.output_handler.handle_generated_image(
run_id,
topic_index,
variant_index,
'note', # 图像类型为note
image,
output_filename,
note_metadata
)
saved_paths.append(saved_path)
logger.info(f"已保存笔记图像 {i+1}/{len(selected_images)}: {saved_path}")
except Exception as e:
logger.error(f"处理图像时出错 '{image_filename}': {e}")
return saved_paths
def create_additional_images(
self,
run_id: str,
topic_index: int,
variant_index: int,
poster_metadata_path: str,
source_image_dir: str,
num_additional_images: int = 3,
output_filename_template: str = "additional_{index}.jpg",
variation_strength: str = "medium",
extra_effects: bool = True
) -> List[str]:
"""选择未被海报使用的图像作为额外配图并处理为3:4比例"""
logger.info(f"开始为主题 {topic_index} 变体 {variant_index} 选择额外配图")
# 获取候选图像
candidate_images = self.get_candidate_images(
poster_metadata_path,
source_image_dir,
num_additional_images
)
if not candidate_images:
logger.warning("没有找到合适的候选图像")
return []
# 生成唯一的随机种子
seed_str = f"{run_id}_{topic_index}_{variant_index}"
seed = sum(ord(c) for c in seed_str)
logger.info(f"使用随机种子: {seed},基于: {seed_str}")
# 使用多进程并行处理图像
saved_paths = []
with concurrent.futures.ProcessPoolExecutor(max_workers=min(4, len(candidate_images))) as executor:
# 创建任务
future_to_image = {}
for i, image_filename in enumerate(candidate_images):
image_path = os.path.join(source_image_dir, image_filename)
# 为每个图像创建单独的种子
image_seed = seed + i
future = executor.submit(
self.process_single_image,
run_id,
topic_index,
variant_index,
image_path,
image_filename,
i,
source_image_dir,
output_filename_template.format(index=i+1),
image_seed,
variation_strength,
extra_effects
)
future_to_image[future] = (i, image_filename)
# 收集结果
for future in concurrent.futures.as_completed(future_to_image):
i, image_filename = future_to_image[future]
try:
saved_path = future.result()
if saved_path:
saved_paths.append(saved_path)
logger.info(f"已保存额外配图 {i+1}/{len(candidate_images)}: {saved_path}")
except Exception as e:
logger.error(f"处理图像时出错 '{image_filename}': {e}")
logger.error(traceback.format_exc())
return saved_paths
def get_candidate_images(self, poster_metadata_path, source_image_dir, num_images):
"""获取候选图像列表,排除已用于海报的图像"""
# 检查输入路径是否存在
if not os.path.exists(poster_metadata_path):
logger.error(f"海报元数据不存在: {poster_metadata_path}")
return []
if not os.path.exists(source_image_dir) or not os.path.isdir(source_image_dir):
logger.error(f"源图像目录不存在: {source_image_dir}")
return []
# 从元数据文件中读取已使用的图像信息
try:
with open(poster_metadata_path, 'r', encoding='utf-8') as f:
poster_metadata = json.load(f)
except Exception as e:
logger.error(f"无法读取海报元数据: {e}")
return []
# 获取已经在海报中使用的图像
used_images = []
if 'collage_images' in poster_metadata:
used_images = poster_metadata['collage_images']
logger.info(f"海报中已使用 {len(used_images)} 张图像: {', '.join(used_images)}")
# 列出源目录中的所有图像文件
image_extensions = ('.jpg', '.jpeg', '.png', '.bmp')
available_images = [
f for f in os.listdir(source_image_dir)
if os.path.isfile(os.path.join(source_image_dir, f)) and
f.lower().endswith(image_extensions)
]
if not available_images:
logger.error(f"源目录中没有找到图像: {source_image_dir}")
return []
logger.info(f"源目录中找到 {len(available_images)} 张图像")
# 过滤掉已经在海报中使用的图像
available_images = [img for img in available_images if img not in used_images]
if not available_images:
logger.warning("所有图像都已在海报中使用,无法创建额外配图")
return []
logger.info(f"过滤后可用图像数量: {len(available_images)}")
# 如果可用图像少于请求数量,进行警告但继续处理
if len(available_images) < num_images:
logger.warning(
f"可用图像数量 ({len(available_images)}) 少于请求的配图数量 ({num_images})"
f"将使用所有可用图像"
)
selected_images = available_images
else:
# 随机选择额外图像
random.seed(sum(map(ord, ''.join(available_images)))) # 确保结果一致性
selected_images = random.sample(available_images, num_images)
random.seed() # 重置随机种子
return selected_images
def process_single_image(
self,
run_id,
topic_index,
variant_index,
image_path,
image_filename,
index,
source_dir,
output_filename,
seed,
variation_strength,
extra_effects
):
"""处理单张图像 - 此方法可在独立进程中运行"""
try:
# 加载图像
image = Image.open(image_path)
# 处理图像为3:4比例并添加微小变化
processed_image = self.optimized_process_image(
image,
(3, 4),
add_variation=True,
seed=seed,
variation_strength=variation_strength,
extra_effects=extra_effects
)
# 创建元数据
additional_metadata = {
"original_image": image_filename,
"additional_index": index + 1,
"source_dir": source_dir,
"is_additional_image": True,
"processed": True,
"aspect_ratio": "3:4",
"variation_applied": True,
"variation_strength": variation_strength,
"extra_effects": extra_effects
}
# 使用输出处理器保存图像
return self.output_handler.handle_generated_image(
run_id,
topic_index,
variant_index,
'additional', # 图像类型为additional
processed_image,
output_filename,
additional_metadata
)
except Exception as e:
logger.error(f"处理图像时出错 '{image_filename}': {e}")
logger.error(traceback.format_exc())
return None
def add_dct_noise(self, image: Image.Image, intensity: float = 0.1, block_size: int = 8) -> Image.Image:
"""
在DCT域添加噪声以对抗pHash (需要Scipy) - 强化版
Args:
image: 输入图像 (建议传入灰度图或处理亮度通道)
intensity: 噪声强度 (0-1)
block_size: DCT块大小 (通常为8)
Returns:
添加噪声后的图像
"""
if not SCIPY_AVAILABLE:
logger.warning("Scipy 未安装无法执行DCT噪声注入。请运行 'pip install scipy'")
# 可以选择返回原图,或执行一个简化的备用方案
# 这里我们返回原图
return image
try:
logger.debug(f"应用强化DCT噪声强度: {intensity:.3f}")
# 确保是灰度图或提取亮度通道 (这里以灰度为例)
if image.mode != 'L':
gray_image = image.convert('L')
else:
gray_image = image
img_array = np.array(gray_image, dtype=float)
h, w = img_array.shape
# 确保尺寸是块大小的倍数
h_pad = (block_size - h % block_size) % block_size
w_pad = (block_size - w % block_size) % block_size
if h_pad != 0 or w_pad != 0:
img_array = np.pad(img_array, ((0, h_pad), (0, w_pad)), mode='reflect')
padded_h, padded_w = img_array.shape
else:
padded_h, padded_w = h, w
# 定义目标系数范围 (例如排除DC的左上角4x4低频区域)
target_h, target_w = 4, 4
for y in range(0, padded_h, block_size):
for x in range(0, padded_w, block_size):
block = img_array[y:y+block_size, x:x+block_size]
dct_block = dct(dct(block.T, norm='ortho').T, norm='ortho')
# --- 强化噪声逻辑 ---
# 1. 计算噪声幅度,不再完全依赖系数本身大小
noise_amplitude = intensity * 30 # 固定基础噪声幅度 (可调)
# 2. 生成噪声
noise = np.random.uniform(-noise_amplitude, noise_amplitude,
(min(block_size, target_h), min(block_size, target_w)))
# 3. 应用噪声到目标低频区域 (跳过DC)
noise_h, noise_w = noise.shape
# 确保索引不超过dct_block的实际大小
apply_h, apply_w = min(noise_h, dct_block.shape[0]), min(noise_w, dct_block.shape[1])
# 尝试乘性噪声 - 可能对保留结构更好一点
factor = np.random.uniform(1.0 - intensity * 0.8, 1.0 + intensity * 0.8,
(min(block_size, target_h), min(block_size, target_w)))
dct_block[0:apply_h, 0:apply_w] *= factor[0:apply_h, 0:apply_w]
dct_block[0, 0] /= factor[0, 0] # 恢复DC系数近似值
# --- 结束强化噪声逻辑 ---
idct_block = idct(idct(dct_block.T, norm='ortho').T, norm='ortho')
img_array[y:y+block_size, x:x+block_size] = idct_block
if h_pad != 0 or w_pad != 0:
img_array = img_array[:h, :w]
img_array = np.clip(img_array, 0, 255)
modified_gray = Image.fromarray(img_array.astype(np.uint8))
if image.mode == 'RGB' and gray_image is not image:
blend_factor = 0.35 # 稍微增加混合强度
r, g, b = image.split()
r = Image.blend(r, modified_gray, blend_factor)
g = Image.blend(g, modified_gray, blend_factor)
b = Image.blend(b, modified_gray, blend_factor)
merged_image = Image.merge('RGB', (r, g, b))
else:
merged_image = modified_gray
logger.debug("强化DCT噪声应用成功。")
return merged_image
except Exception as e:
logger.error(f"强化DCT噪声注入出错: {e}")
return image
def add_phash_noise(self, image: Image.Image, intensity: float = 0.05) -> Image.Image:
"""调用强化的 add_dct_noise 方法"""
logger.debug(f"调用强化add_dct_noise对抗pHash强度: {intensity:.3f}")
return self.add_dct_noise(image, intensity=intensity)
def apply_smart_crop_resize(self, image: Image.Image, strength: str = "medium") -> Image.Image:
"""
应用智能裁剪和重缩放来抵抗哈希算法
Args:
image: 输入图像
strength: 处理强度 ('low', 'medium', 'high')
Returns:
处理后的图像
"""
try:
original_width, original_height = image.size
logger.debug(f"应用智能裁剪+重缩放 (强度: {strength}), 原始尺寸: {original_width}x{original_height}")
# 根据强度决定裁剪量 (0-3 像素)
if strength == "low":
max_crop = 1
elif strength == "high":
max_crop = 3
else: # medium
max_crop = 2
# 随机决定每边的裁剪量
crop_left = random.randint(0, max_crop)
crop_top = random.randint(0, max_crop)
crop_right = random.randint(0, max_crop)
crop_bottom = random.randint(0, max_crop)
# 计算裁剪后的边界
left = crop_left
top = crop_top
right = original_width - crop_right
bottom = original_height - crop_bottom
# 确保裁剪后尺寸至少为1x1
if left >= right or top >= bottom:
logger.warning("智能裁剪计算无效,跳过此步骤。")
return image
logger.debug(f" 裁剪参数: L={crop_left}, T={crop_top}, R={crop_right}, B={crop_bottom}")
logger.debug(f" 裁剪区域: ({left}, {top}, {right}, {bottom})")
# 执行裁剪
cropped_image = image.crop((left, top, right, bottom))
# 使用高质量插值将图像缩放回原始尺寸
logger.debug(f" 将裁剪后图像 ({cropped_image.width}x{cropped_image.height}) 缩放回 ({original_width}x{original_height})")
resampling_filter = Image.LANCZOS # 高质量插值
resized_image = cropped_image.resize((original_width, original_height), resample=resampling_filter)
logger.debug("智能裁剪+重缩放应用成功。")
return resized_image
except Exception as e:
logger.error(f"智能裁剪+重缩放时出错: {e}")
return image # 出错时返回原图
def perturb_color_histogram(self, image: Image.Image, strength: float = 0.03) -> Image.Image:
"""
扰动图像的颜色直方图,对抗基于颜色统计的图像匹配
Args:
image: 输入图像
strength: 扰动强度(0-1)
Returns:
处理后的图像
"""
logger.debug(f"扰动颜色直方图,强度: {strength:.3f}")
# 确保为RGB模式
if image.mode != 'RGB':
image = image.convert('RGB')
# 转为numpy数组
img_array = np.array(image)
height, width, channels = img_array.shape
# 对每个通道分别处理
for channel in range(channels):
# 计算当前通道的直方图
hist, _ = np.histogram(img_array[:,:,channel].flatten(), bins=64, range=(0, 256))
# 找出主要颜色区间 (频率高的区间)
threshold = np.percentile(hist, 70) # 取前30%的颜色块
significant_bins = np.where(hist > threshold)[0]
if len(significant_bins) > 0:
for bin_idx in significant_bins:
# 计算当前bin对应的颜色范围
bin_width = 256 // 64
color_low = bin_idx * bin_width
color_high = (bin_idx + 1) * bin_width
# 创建颜色范围掩码
mask = (img_array[:,:,channel] >= color_low) & (img_array[:,:,channel] < color_high)
if np.any(mask):
# 生成随机偏移值
offset = int(strength * bin_width * (random.random() - 0.5) * 2)
# 应用偏移确保在0-255范围内
img_array[:,:,channel][mask] = np.clip(
img_array[:,:,channel][mask] + offset, 0, 255).astype(np.uint8)
# 转回PIL图像
logger.debug("颜色直方图扰动成功。")
return Image.fromarray(img_array)
def strip_metadata(self, image: Image.Image) -> Image.Image:
"""
移除图像中的所有元数据 (修复版)
Args:
image: 输入图像
Returns:
无元数据的图像
"""
logger.debug("移除图像元数据...")
try:
# 从当前图像对象的像素数据创建一个新的Image对象
# 这确保我们使用的是内存中修改后的数据
# 保留原始格式信息,如果可用
img_format = image.format if hasattr(image, 'format') else 'PNG'
# 如果图像有alpha通道需要正确处理
if image.mode == 'RGBA':
# 创建一个白色背景然后粘贴带有alpha的图像
background = Image.new("RGB", image.size, (255, 255, 255))
background.paste(image, mask=image.split()[3]) # 3 is the alpha channel
image_to_save = background
img_format = 'JPEG' # 通常去除alpha后存为JPEG
elif image.mode == 'P':
# 带调色板的图像转换为RGB
image_to_save = image.convert('RGB')
img_format = 'JPEG'
else:
image_to_save = image
# 保存到内存缓冲区
data = io.BytesIO()
# 确保保存时指定质量参数,避免默认压缩导致意外变化
if img_format == 'JPEG':
image_to_save.save(data, format='JPEG', quality=95) # 使用高质量JPEG
else:
# 对于PNG等无损格式不需要质量参数
image_to_save.save(data, format=img_format)
data.seek(0) # 重置缓冲区指针
reloaded_image = Image.open(data)
logger.debug("元数据移除成功。")
return reloaded_image
except Exception as e:
logger.error(f"移除元数据时出错: {e}")
return image # 出错时返回原图
def apply_overlay_noise(self, image: Image.Image, alpha: int = 10, noise_type: str = 'uniform') -> Image.Image:
"""
在图像上叠加一个低透明度的噪声图层
Args:
image: 输入图像
alpha: 叠加噪声图层的 Alpha 值 (0-255)
noise_type: 'gaussian''uniform'
Returns:
叠加噪声后的图像
"""
try:
logger.debug(f"应用低透明度噪声叠加: alpha={alpha}, type={noise_type}")
# 确保图像是 RGBA 模式以处理透明度
if image.mode != 'RGBA':
base_image = image.convert('RGBA')
else:
base_image = image.copy() # 操作副本
width, height = base_image.size
# 创建噪声图层 (灰度噪声即可)
if noise_type == 'gaussian':
# 生成范围在 0-255 的高斯噪声均值128
noise_array = np.random.normal(loc=128, scale=40, size=(height, width)).clip(0, 255).astype(np.uint8)
else: # uniform
noise_array = np.random.randint(0, 256, size=(height, width), dtype=np.uint8)
noise_image = Image.fromarray(noise_array, mode='L')
# 将噪声灰度图转换为 RGBA并设置 alpha 通道
noise_rgba = noise_image.convert('RGBA')
# 创建一个全为指定 alpha 值的通道
alpha_channel = Image.new('L', noise_image.size, alpha)
noise_rgba.putalpha(alpha_channel)
# 使用 alpha_composite 进行混合叠加
# alpha_composite 要求两个输入都是 RGBA
combined_image = Image.alpha_composite(base_image, noise_rgba)
# 通常我们希望最终结果是 RGB所以转换回去
# 如果原图就是 RGBA 且需要保留透明度,则省略此步
final_image = combined_image.convert('RGB')
logger.debug("低透明度噪声叠加应用成功。")
return final_image
except Exception as e:
logger.error(f"应用叠加噪声时出错: {e}")
logger.error(traceback.format_exc()) # 打印详细错误
return image # 出错时返回原图
def optimize_anti_hash_methods(self, image: Image.Image, strength: str = "medium") -> Image.Image:
"""优化后的哈希对抗方法,策略: 裁剪缩放 + DCT噪声 + 叠加噪声"""
logger.info(f"--- 开始优化抗哈希方法 (强度: {strength}) - 叠加噪声策略 ---")
original_image_for_logging = image.copy()
# --- 参数定义 ---
if strength == "low":
phash_intensity = 0.06 # 轻微DCT噪声
color_hist_strength = 0.02 # 轻微颜色扰动
apply_crop_resize = True # 应用裁剪缩放
noise_alpha = random.randint(5, 8) # 低透明度噪声
elif strength == "high":
phash_intensity = 0.20 # 较强DCT噪声
color_hist_strength = 0.06 # 较强颜色扰动
apply_crop_resize = True # 应用裁剪缩放
noise_alpha = random.randint(12, 18) # 较高透明度噪声
else: # medium
phash_intensity = 0.12 # 中等DCT噪声
color_hist_strength = 0.04 # 中等颜色扰动
apply_crop_resize = True # 应用裁剪缩放
noise_alpha = random.randint(8, 12) # 中等透明度噪声
logger.debug(f"参数: pHash强度={phash_intensity:.2f}, 颜色强度={color_hist_strength:.2f}, 应用裁剪缩放={apply_crop_resize}, 噪声Alpha={noise_alpha}")
processed_image = image # 从原图开始
# 1. 智能裁剪 + 重缩放 (策略C)
if apply_crop_resize:
processed_image = self.apply_smart_crop_resize(processed_image, strength)
# 2. 强化的pHash对抗方法 (策略A)
logger.debug(f"应用 pHash 对抗 (强化DCT噪声), 强度={phash_intensity:.2f}")
processed_image = self.add_phash_noise(processed_image, intensity=phash_intensity)
# 3. 保留轻微的颜色直方图扰动
if color_hist_strength > 0:
logger.debug(f"应用颜色直方图扰动, 强度={color_hist_strength:.3f}")
processed_image = self.perturb_color_histogram(processed_image, strength=color_hist_strength)
# 4. 应用低透明度噪声叠加 (新策略核心)
if noise_alpha > 0:
processed_image = self.apply_overlay_noise(processed_image, alpha=noise_alpha, noise_type='uniform') # 先用 uniform 试试
# 内部已有日志
# --- 移除之前的 aHash块, dHash线, 区域变换, 高斯噪声等 ---
logger.debug("已移除 aHash/dHash特定对抗、区域变换、高斯噪声等。")
# 对比修改前后
try:
diff = ImageChops.difference(original_image_for_logging.convert('RGB'), processed_image.convert('RGB')).getbbox() # 确保模式一致
if diff:
logger.info(f"图像已修改。差异区域: {diff}")
else:
logger.warning("!!!优化方法似乎未修改图像!!!")
except ValueError:
logger.warning("无法比较图像差异:模式可能不同或错误。")
except Exception as log_e:
logger.warning(f"无法比较图像差异: {log_e}")
logger.info(f"--- 完成优化抗哈希方法 (强度: {strength}) - 叠加噪声策略 ---")
return processed_image
def optimized_process_image(
self,
image: Image.Image,
target_ratio: Tuple[int, int],
add_variation: bool = True,
seed: int = None,
variation_strength: str = "medium",
extra_effects: bool = True
) -> Image.Image:
"""优化后的图像处理方法,使用更高效的算法,添加反查重技术"""
# 设置随机种子
if seed is not None:
random.seed(seed)
np.random.seed(seed)
# 根据微调强度设置参数
if variation_strength == "low":
brightness_factor = random.uniform(0.97, 1.03)
contrast_factor = random.uniform(0.97, 1.03)
saturation_factor = random.uniform(0.97, 1.03)
max_rotation = 0.5
border_size = random.randint(0, 1)
use_extra = random.random() < 0.3 and extra_effects
elif variation_strength == "high":
brightness_factor = random.uniform(0.92, 1.08)
contrast_factor = random.uniform(0.92, 1.08)
saturation_factor = random.uniform(0.92, 1.08)
max_rotation = 2.0
border_size = random.randint(0, 3)
use_extra = extra_effects
else: # medium
brightness_factor = random.uniform(0.95, 1.05)
contrast_factor = random.uniform(0.95, 1.05)
saturation_factor = random.uniform(0.95, 1.05)
max_rotation = 1.0
border_size = random.randint(0, 2)
use_extra = random.random() < 0.7 and extra_effects
# 调整图像为目标比例
width, height = image.size
current_ratio = width / height
target_ratio_value = target_ratio[0] / target_ratio[1]
# 调整大小
if current_ratio > target_ratio_value: # 图片较宽
new_height = 1200
new_width = int(new_height * current_ratio)
else: # 图片较高
new_width = 900
new_height = int(new_width / current_ratio)
# 高效调整尺寸
resized_image = image.resize((new_width, new_height), Image.LANCZOS)
# 裁剪为目标比例
resized_width, resized_height = resized_image.size
if resized_width / resized_height > target_ratio_value:
crop_width = int(resized_height * target_ratio_value)
max_offset = max(1, min(10, (resized_width - crop_width) // 10))
offset = random.randint(-max_offset, max_offset) if add_variation else 0
crop_x1 = max(0, min((resized_width - crop_width) // 2 + offset, resized_width - crop_width))
crop_x2 = crop_x1 + crop_width
result = resized_image.crop((crop_x1, 0, crop_x2, resized_height))
else:
crop_height = int(resized_width / target_ratio_value)
max_offset = max(1, min(10, (resized_height - crop_height) // 10))
offset = random.randint(-max_offset, max_offset) if add_variation else 0
crop_y1 = max(0, min((resized_height - crop_height) // 2 + offset, resized_height - crop_height))
crop_y2 = crop_y1 + crop_height
result = resized_image.crop((0, crop_y1, resized_width, crop_y2))
# 如果不需要变化或是低强度且禁用额外效果
if not add_variation:
logger.info("add_variation=False跳过所有变化和抗哈希处理。")
# 重置随机种子
if seed is not None:
random.seed()
np.random.seed()
# 清除元数据后返回
return self.strip_metadata(result)
logger.info(f"应用基础变化和抗哈希处理 (强度: {variation_strength}, 额外效果: {use_extra})")
processed_image = result.convert('RGB')
# 1. 亮度调整
if abs(brightness_factor - 1.0) > 0.01:
enhancer = ImageEnhance.Brightness(processed_image)
processed_image = enhancer.enhance(brightness_factor)
# 2. 对比度调整
if abs(contrast_factor - 1.0) > 0.01:
enhancer = ImageEnhance.Contrast(processed_image)
processed_image = enhancer.enhance(contrast_factor)
# 3. 饱和度调整
if abs(saturation_factor - 1.0) > 0.01:
enhancer = ImageEnhance.Color(processed_image)
processed_image = enhancer.enhance(saturation_factor)
# 4. 旋转 (只在中高强度时应用)
if variation_strength != "low" and abs(max_rotation) > 0.1:
rotation_angle = random.uniform(-max_rotation, max_rotation)
if abs(rotation_angle) > 0.1: # 只有当角度足够大时才旋转
processed_image = processed_image.rotate(rotation_angle, resample=Image.BICUBIC, expand=False)
# 5. 应用抗哈希技术
if use_extra:
logger.debug("调用 optimize_anti_hash_methods...")
processed_image = self.optimize_anti_hash_methods(processed_image, variation_strength)
else:
logger.info("use_extra=False跳过 optimize_anti_hash_methods。")
# 应用模糊/锐化/边框等额外效果 (如果 use_extra 为 True)
if use_extra:
logger.debug("应用额外效果 (模糊/锐化/边框)...")
# 根据强度决定是否应用特定效果
apply_sharpen = random.random() < 0.4
apply_blur = not apply_sharpen and random.random() < 0.3
# 锐化
if apply_sharpen:
enhancer = ImageEnhance.Sharpness(processed_image)
sharpness = 1.2 if variation_strength == "high" else 1.1
processed_image = enhancer.enhance(sharpness)
# 模糊
elif apply_blur:
radius = 0.7 if variation_strength == "high" else 0.4
processed_image = processed_image.filter(ImageFilter.GaussianBlur(radius=radius))
# 边框处理 (在图像不太小的情况下)
if border_size > 0 and min(processed_image.size) > 300:
border_color = (
random.randint(0, 5),
random.randint(0, 5),
random.randint(0, 5)
)
w, h = processed_image.size
bordered = Image.new('RGB', (w + border_size*2, h + border_size*2), border_color)
bordered.paste(processed_image, (border_size, border_size))
logger.debug("额外效果应用完成。")
else:
logger.info("use_extra=False跳过额外效果。")
# **关键:确保在所有修改之后调用修复后的 strip_metadata**
logger.debug("最后调用 strip_metadata 清除元数据。")
final_image = self.strip_metadata(processed_image)
# 重置随机种子
if seed is not None:
random.seed()
np.random.seed()
logger.debug("随机种子已重置。")
logger.info(f"图像处理完成 (强度: {variation_strength})")
return final_image
def process_poster_for_notes(
run_id: str,
topic_index: int,
variant_index: int,
poster_image_path: str,
poster_metadata_path: str,
source_image_dir: str,
num_additional_images: int,
output_handler: OutputHandler,
output_filename_template: str = "note_{index}.jpg"
) -> List[str]:
"""
处理海报并创建笔记图像
Args:
run_id: 运行ID
topic_index: 主题索引
variant_index: 变体索引
poster_image_path: 海报图像路径
poster_metadata_path: 海报元数据路径
source_image_dir: 源图像目录
num_additional_images: 要使用的额外图像数量
output_handler: 输出处理器
output_filename_template: 输出文件名模板
Returns:
List[str]: 保存的笔记图像路径列表
"""
logger.info(f"开始为海报创建笔记图像: {poster_image_path}")
# 验证输入
if not os.path.exists(poster_image_path):
logger.error(f"海报图像不存在: {poster_image_path}")
return []
# 创建处理器实例并处理
creator = PosterNotesCreator(output_handler)
return creator.create_notes_images(
run_id,
topic_index,
variant_index,
poster_image_path,
poster_metadata_path,
source_image_dir,
num_additional_images,
output_filename_template
)
def select_additional_images(
run_id: str,
topic_index: int,
variant_index: int,
poster_metadata_path: str,
source_image_dir: str,
num_additional_images: int,
output_handler: OutputHandler,
output_filename_template: str = "additional_{index}.jpg",
variation_strength: str = "medium",
extra_effects: bool = True
) -> List[str]:
"""
选择未被海报使用的图像作为额外配图并处理为3:4比例
"""
logger.info(f"开始为主题 {topic_index} 变体 {variant_index} 选择额外配图")
# 验证输入
if not os.path.exists(poster_metadata_path):
logger.error(f"海报元数据不存在: {poster_metadata_path}")
return []
# 创建处理器实例
creator = PosterNotesCreator(output_handler)
# 使用优化后的方法处理图像
return creator.create_additional_images(
run_id,
topic_index,
variant_index,
poster_metadata_path,
source_image_dir,
num_additional_images,
output_filename_template,
variation_strength,
extra_effects
)