TravelContentCreator/utils/poster_notes_creator.py

886 lines
35 KiB
Python
Raw Normal View History

2025-04-26 14:53:54 +08:00
import os
import random
import logging
import json
from PIL import Image
import traceback
from typing import List, Tuple, Dict, Any, Optional
import concurrent.futures
import numpy as np
from PIL import ImageEnhance, ImageFilter
2025-04-26 14:53:54 +08:00
from .output_handler import OutputHandler
import io
2025-04-26 14:53:54 +08:00
2025-05-06 16:34:46 +08:00
# 尝试导入 scipy如果失败则标记
try:
from scipy.fftpack import dct, idct
SCIPY_AVAILABLE = True
except ImportError:
SCIPY_AVAILABLE = False
dct = None
idct = None
2025-04-26 14:53:54 +08:00
logger = logging.getLogger(__name__)
class PosterNotesCreator:
"""
处理原始海报作为主图并随机选择额外的图片作为笔记图片
确保选择的笔记图片与海报中使用的图片不重复
"""
def __init__(self, output_handler: OutputHandler):
"""
初始化 PosterNotesCreator
Args:
output_handler: 可选的 OutputHandler 实例用于处理输出
"""
self.output_handler = output_handler
logging.info("PosterNotesCreator 初始化完成")
def create_notes_images(
self,
run_id: str,
topic_index: int,
variant_index: int,
poster_image_path: str,
poster_metadata_path: str,
source_image_dir: str,
num_additional_images: int,
output_filename_template: str = "note_{index}.jpg"
) -> List[str]:
"""
创建笔记图像
Args:
run_id: 运行ID
topic_index: 主题索引
variant_index: 变体索引
poster_image_path: 海报图像路径
poster_metadata_path: 海报元数据路径
source_image_dir: 源图像目录
num_additional_images: 要使用的额外图像数量
output_filename_template: 输出文件名模板
Returns:
List[str]: 保存的笔记图像路径列表
"""
# 检查输入路径是否存在
if not os.path.exists(poster_image_path):
logger.error(f"海报图像不存在: {poster_image_path}")
return []
if not os.path.exists(poster_metadata_path):
logger.error(f"海报元数据不存在: {poster_metadata_path}")
return []
if not os.path.exists(source_image_dir) or not os.path.isdir(source_image_dir):
logger.error(f"源图像目录不存在: {source_image_dir}")
return []
# 从元数据文件中读取已使用的图像信息
try:
with open(poster_metadata_path, 'r', encoding='utf-8') as f:
poster_metadata = json.load(f)
except Exception as e:
logger.error(f"无法读取海报元数据: {e}")
return []
# 获取已经在海报中使用的图像
used_images = []
if 'collage_images' in poster_metadata:
used_images = poster_metadata['collage_images']
logger.info(f"海报中已使用 {len(used_images)} 张图像: {', '.join(used_images)}")
# 列出源目录中的所有图像文件
image_extensions = ('.jpg', '.jpeg', '.png', '.bmp')
available_images = [
f for f in os.listdir(source_image_dir)
if os.path.isfile(os.path.join(source_image_dir, f)) and
f.lower().endswith(image_extensions)
]
if not available_images:
logger.error(f"源目录中没有找到图像: {source_image_dir}")
return []
logger.info(f"源目录中找到 {len(available_images)} 张图像")
# 过滤掉已经在海报中使用的图像
available_images = [img for img in available_images if img not in used_images]
if not available_images:
logger.warning("所有图像都已在海报中使用,无法创建额外笔记")
return []
logger.info(f"过滤后可用图像数量: {len(available_images)}")
# 如果可用图像少于请求数量,进行警告但继续处理
if len(available_images) < num_additional_images:
logger.warning(
f"可用图像数量 ({len(available_images)}) 少于请求的笔记数量 ({num_additional_images})"
f"将使用所有可用图像"
)
selected_images = available_images
else:
# 随机选择额外图像
selected_images = random.sample(available_images, num_additional_images)
logger.info(f"已选择 {len(selected_images)} 张图像作为笔记")
# 保存选择的笔记图像
saved_paths = []
for i, image_filename in enumerate(selected_images):
try:
# 加载图像
image_path = os.path.join(source_image_dir, image_filename)
image = Image.open(image_path)
# 生成输出文件名
output_filename = output_filename_template.format(index=i+1)
# 创建元数据
note_metadata = {
"original_image": image_filename,
"note_index": i + 1,
"source_dir": source_image_dir,
"associated_poster": os.path.basename(poster_image_path)
}
# 使用输出处理器保存图像
saved_path = self.output_handler.handle_generated_image(
run_id,
topic_index,
variant_index,
'note', # 图像类型为note
image,
output_filename,
note_metadata
)
saved_paths.append(saved_path)
logger.info(f"已保存笔记图像 {i+1}/{len(selected_images)}: {saved_path}")
except Exception as e:
logger.error(f"处理图像时出错 '{image_filename}': {e}")
return saved_paths
def create_additional_images(
self,
run_id: str,
topic_index: int,
variant_index: int,
poster_metadata_path: str,
source_image_dir: str,
num_additional_images: int = 3,
2025-04-26 15:53:44 +08:00
output_filename_template: str = "additional_{index}.jpg",
variation_strength: str = "medium",
extra_effects: bool = True
2025-04-26 14:53:54 +08:00
) -> List[str]:
"""选择未被海报使用的图像作为额外配图并处理为3:4比例"""
logger.info(f"开始为主题 {topic_index} 变体 {variant_index} 选择额外配图")
2025-04-26 14:53:54 +08:00
# 获取候选图像
candidate_images = self.get_candidate_images(
poster_metadata_path,
source_image_dir,
num_additional_images
)
if not candidate_images:
logger.warning("没有找到合适的候选图像")
return []
2025-04-26 14:53:54 +08:00
# 生成唯一的随机种子
seed_str = f"{run_id}_{topic_index}_{variant_index}"
seed = sum(ord(c) for c in seed_str)
logger.info(f"使用随机种子: {seed},基于: {seed_str}")
# 使用多进程并行处理图像
saved_paths = []
with concurrent.futures.ProcessPoolExecutor(max_workers=min(4, len(candidate_images))) as executor:
# 创建任务
future_to_image = {}
for i, image_filename in enumerate(candidate_images):
image_path = os.path.join(source_image_dir, image_filename)
# 为每个图像创建单独的种子
image_seed = seed + i
future = executor.submit(
self.process_single_image,
run_id,
topic_index,
variant_index,
image_path,
image_filename,
i,
source_image_dir,
output_filename_template.format(index=i+1),
image_seed,
variation_strength,
extra_effects
)
future_to_image[future] = (i, image_filename)
# 收集结果
for future in concurrent.futures.as_completed(future_to_image):
i, image_filename = future_to_image[future]
try:
saved_path = future.result()
if saved_path:
saved_paths.append(saved_path)
logger.info(f"已保存额外配图 {i+1}/{len(candidate_images)}: {saved_path}")
except Exception as e:
logger.error(f"处理图像时出错 '{image_filename}': {e}")
logger.error(traceback.format_exc())
2025-04-26 14:53:54 +08:00
return saved_paths
def get_candidate_images(self, poster_metadata_path, source_image_dir, num_images):
"""获取候选图像列表,排除已用于海报的图像"""
2025-04-26 14:53:54 +08:00
# 检查输入路径是否存在
if not os.path.exists(poster_metadata_path):
logger.error(f"海报元数据不存在: {poster_metadata_path}")
return []
if not os.path.exists(source_image_dir) or not os.path.isdir(source_image_dir):
logger.error(f"源图像目录不存在: {source_image_dir}")
return []
# 从元数据文件中读取已使用的图像信息
try:
with open(poster_metadata_path, 'r', encoding='utf-8') as f:
poster_metadata = json.load(f)
except Exception as e:
logger.error(f"无法读取海报元数据: {e}")
return []
# 获取已经在海报中使用的图像
used_images = []
if 'collage_images' in poster_metadata:
used_images = poster_metadata['collage_images']
logger.info(f"海报中已使用 {len(used_images)} 张图像: {', '.join(used_images)}")
# 列出源目录中的所有图像文件
image_extensions = ('.jpg', '.jpeg', '.png', '.bmp')
available_images = [
f for f in os.listdir(source_image_dir)
if os.path.isfile(os.path.join(source_image_dir, f)) and
f.lower().endswith(image_extensions)
]
if not available_images:
logger.error(f"源目录中没有找到图像: {source_image_dir}")
return []
logger.info(f"源目录中找到 {len(available_images)} 张图像")
# 过滤掉已经在海报中使用的图像
available_images = [img for img in available_images if img not in used_images]
if not available_images:
logger.warning("所有图像都已在海报中使用,无法创建额外配图")
return []
logger.info(f"过滤后可用图像数量: {len(available_images)}")
# 如果可用图像少于请求数量,进行警告但继续处理
if len(available_images) < num_images:
2025-04-26 14:53:54 +08:00
logger.warning(
f"可用图像数量 ({len(available_images)}) 少于请求的配图数量 ({num_images})"
2025-04-26 14:53:54 +08:00
f"将使用所有可用图像"
)
selected_images = available_images
else:
# 随机选择额外图像
random.seed(sum(map(ord, ''.join(available_images)))) # 确保结果一致性
selected_images = random.sample(available_images, num_images)
random.seed() # 重置随机种子
2025-04-26 14:53:54 +08:00
return selected_images
def process_single_image(
self,
run_id,
topic_index,
variant_index,
image_path,
image_filename,
index,
source_dir,
output_filename,
seed,
variation_strength,
extra_effects
):
"""处理单张图像 - 此方法可在独立进程中运行"""
try:
# 加载图像
image = Image.open(image_path)
# 处理图像为3:4比例并添加微小变化
processed_image = self.optimized_process_image(
image,
(3, 4),
add_variation=True,
seed=seed,
variation_strength=variation_strength,
extra_effects=extra_effects
)
# 创建元数据
additional_metadata = {
"original_image": image_filename,
"additional_index": index + 1,
"source_dir": source_dir,
"is_additional_image": True,
"processed": True,
"aspect_ratio": "3:4",
"variation_applied": True,
"variation_strength": variation_strength,
"extra_effects": extra_effects
}
# 使用输出处理器保存图像
return self.output_handler.handle_generated_image(
run_id,
topic_index,
variant_index,
'additional', # 图像类型为additional
processed_image,
output_filename,
additional_metadata
)
except Exception as e:
logger.error(f"处理图像时出错 '{image_filename}': {e}")
logger.error(traceback.format_exc())
return None
2025-05-06 16:34:46 +08:00
def add_dct_noise(self, image: Image.Image, intensity: float = 0.1, block_size: int = 8) -> Image.Image:
"""
2025-05-06 16:34:46 +08:00
在DCT域添加噪声以对抗pHash (需要Scipy)
Args:
2025-05-06 16:34:46 +08:00
image: 输入图像 (建议传入灰度图或处理亮度通道)
intensity: 噪声强度 (0-1)
block_size: DCT块大小 (通常为8)
Returns:
2025-05-06 16:34:46 +08:00
添加噪声后的图像
"""
2025-05-06 16:34:46 +08:00
if not SCIPY_AVAILABLE:
logger.warning("Scipy 未安装无法执行DCT噪声注入。请运行 'pip install scipy'")
# 可以选择返回原图,或执行一个简化的备用方案
# 这里我们返回原图
return image
2025-05-06 16:34:46 +08:00
try:
# 确保是灰度图或提取亮度通道 (这里以灰度为例)
if image.mode != 'L':
# 如果是彩色图,可以在 Y 通道 (亮度) 操作
# 为了简化,我们先转为灰度处理
gray_image = image.convert('L')
else:
gray_image = image
2025-05-06 16:34:46 +08:00
img_array = np.array(gray_image, dtype=float)
h, w = img_array.shape
2025-05-06 16:34:46 +08:00
# 确保尺寸是块大小的倍数
h_pad = (block_size - h % block_size) % block_size
w_pad = (block_size - w % block_size) % block_size
if h_pad != 0 or w_pad != 0:
img_array = np.pad(img_array, ((0, h_pad), (0, w_pad)), mode='reflect')
padded_h, padded_w = img_array.shape
else:
padded_h, padded_w = h, w
2025-05-06 16:34:46 +08:00
# 分块处理
for y in range(0, padded_h, block_size):
for x in range(0, padded_w, block_size):
block = img_array[y:y+block_size, x:x+block_size]
2025-05-06 16:34:46 +08:00
# 执行2D DCT
dct_block = dct(dct(block.T, norm='ortho').T, norm='ortho')
2025-05-06 16:34:46 +08:00
# 在非DC系数上添加噪声 (跳过 dct_block[0, 0])
# 噪声强度与系数幅度相关,避免在小系数上加过大噪声
noise = np.random.randn(block_size, block_size) * intensity * np.abs(dct_block)
# noise = np.random.uniform(-intensity*50, intensity*50, (block_size, block_size))
noise[0, 0] = 0 # 不改变DC系数
# 将噪声添加到DCT系数
noisy_dct_block = dct_block + noise
# 执行2D IDCT
idct_block = idct(idct(noisy_dct_block.T, norm='ortho').T, norm='ortho')
# 将处理后的块放回图像数组
img_array[y:y+block_size, x:x+block_size] = idct_block
# 裁剪回原始尺寸 (如果有填充)
if h_pad != 0 or w_pad != 0:
img_array = img_array[:h, :w]
# 裁剪像素值并转换类型
img_array = np.clip(img_array, 0, 255)
modified_gray = Image.fromarray(img_array.astype(np.uint8))
# 如果原图是彩色,将修改后的亮度通道合并回去
if image.mode == 'RGB' and gray_image is not image:
# 注意:简单替换亮度通道可能效果不好,混合通常更好
# 这里用混合的方式
blend_factor = 0.3 # 控制混合强度
r, g, b = image.split()
r = Image.blend(r, modified_gray, blend_factor)
g = Image.blend(g, modified_gray, blend_factor)
b = Image.blend(b, modified_gray, blend_factor)
return Image.merge('RGB', (r, g, b))
else:
# 如果原图是灰度或处理失败,返回修改后的灰度图
return modified_gray
except Exception as e:
logger.error(f"DCT噪声注入出错: {e}")
return image # 出错时返回原图
2025-05-06 16:34:46 +08:00
def add_phash_noise(self, image: Image.Image, intensity: float = 0.05) -> Image.Image:
"""
2025-05-06 16:34:46 +08:00
添加扰动以对抗感知哈希算法(pHash)
现在调用基于 Scipy DCT 噪声注入方法
Args:
image: 输入图像
2025-05-06 16:34:46 +08:00
intensity: 扰动强度(0-1)
Returns:
2025-05-06 16:34:46 +08:00
添加扰动后的图像
"""
2025-05-06 16:34:46 +08:00
return self.add_dct_noise(image, intensity=intensity)
2025-05-06 15:49:31 +08:00
def optimize_anti_hash_methods(self, image: Image.Image, strength: str = "medium") -> Image.Image:
"""
2025-05-06 16:34:46 +08:00
综合优化的哈希对抗方法强度已增加
2025-05-06 15:49:31 +08:00
"""
2025-05-06 16:34:46 +08:00
# 根据强度设置参数 (显著增加 high 强度)
2025-05-06 15:49:31 +08:00
if strength == "low":
ahash_intensity = 0.03
2025-05-06 16:34:46 +08:00
phash_intensity = 0.05 # 基础DCT噪声强度
2025-05-06 15:49:31 +08:00
dhash_intensity = 0.03
2025-05-06 16:34:46 +08:00
region_flip_prob = 0.3
num_ahash_blocks = random.randint(8, 15)
num_dhash_lines = random.randint(6, 10)
2025-05-06 15:49:31 +08:00
elif strength == "high":
2025-05-06 16:34:46 +08:00
ahash_intensity = 0.18 # 大幅增加
phash_intensity = 0.15 # 大幅增加
dhash_intensity = 0.18 # 大幅增加
region_flip_prob = 0.7 # 更大概率翻转
num_ahash_blocks = random.randint(20, 35) # 更多块
num_dhash_lines = random.randint(15, 25) # 更多线
2025-05-06 15:49:31 +08:00
else: # medium
2025-05-06 16:34:46 +08:00
ahash_intensity = 0.08 # 增加
phash_intensity = 0.08 # 增加
dhash_intensity = 0.08 # 增加
region_flip_prob = 0.5
num_ahash_blocks = random.randint(12, 25)
num_dhash_lines = random.randint(10, 18)
# 1. 针对aHash (平均哈希)的处理 - 强度已增加
img_array = np.array(image, dtype=np.int16)
2025-05-06 15:49:31 +08:00
h, w = img_array.shape[0], img_array.shape[1]
2025-05-06 16:34:46 +08:00
# num_ahash_blocks = random.randint(10, 20)
for _ in range(num_ahash_blocks):
2025-05-06 15:49:31 +08:00
block_w = random.randint(w//20, w//10)
block_h = random.randint(h//20, h//10)
x = random.randint(0, w - block_w)
y = random.randint(0, h - block_h)
2025-05-06 16:34:46 +08:00
delta = int(random.uniform(-35, 35) * ahash_intensity) # 增加delta范围
block = img_array[y:y+block_h, x:x+block_w]
img_array[y:y+block_h, x:x+block_w] = np.clip(block + delta, 0, 255)
2025-05-06 15:49:31 +08:00
image = Image.fromarray(img_array.astype(np.uint8))
2025-05-06 16:34:46 +08:00
# 2. 调用强化的pHash对抗方法
2025-05-06 15:49:31 +08:00
image = self.add_phash_noise(image, intensity=phash_intensity)
2025-05-06 16:34:46 +08:00
# 3. 针对dHash (差值哈希)的处理 - 强度已增加
img_array = np.array(image, dtype=np.int16)
h, w = img_array.shape[0], img_array.shape[1]
2025-05-06 15:49:31 +08:00
mask = np.zeros_like(img_array, dtype=bool)
2025-05-06 16:34:46 +08:00
# num_dhash_lines = random.randint(8, 12)
for _ in range(num_dhash_lines):
if random.random() < 0.5:
2025-05-06 15:49:31 +08:00
y = random.randint(0, h - 1)
2025-05-06 16:34:46 +08:00
line_width = random.randint(1, 4) # 增加线宽可能性
if len(mask.shape) == 3:
2025-05-06 15:49:31 +08:00
mask[max(0, y-line_width//2):min(h, y+line_width//2+1), :, :] = True
2025-05-06 16:34:46 +08:00
else:
2025-05-06 15:49:31 +08:00
mask[max(0, y-line_width//2):min(h, y+line_width//2+1), :] = True
2025-05-06 16:34:46 +08:00
else:
2025-05-06 15:49:31 +08:00
x = random.randint(0, w - 1)
2025-05-06 16:34:46 +08:00
line_width = random.randint(1, 4) # 增加线宽可能性
if len(mask.shape) == 3:
2025-05-06 15:49:31 +08:00
mask[:, max(0, x-line_width//2):min(w, x+line_width//2+1), :] = True
2025-05-06 16:34:46 +08:00
else:
2025-05-06 15:49:31 +08:00
mask[:, max(0, x-line_width//2):min(w, x+line_width//2+1)] = True
2025-05-06 16:34:46 +08:00
delta = (np.random.random(img_array.shape) * 2 - 1) * dhash_intensity * 35 # 增加delta范围
img_array[mask] += delta[mask].astype(np.int16)
2025-05-06 15:49:31 +08:00
2025-05-06 16:34:46 +08:00
img_array = np.clip(img_array, 0, 255)
2025-05-06 15:49:31 +08:00
2025-05-06 16:34:46 +08:00
# 4. 颜色直方图扰动 (强度也略微增加)
image = Image.fromarray(img_array.astype(np.uint8))
color_hist_strength = dhash_intensity * 0.6 # 关联强度
image = self.perturb_color_histogram(image, strength=color_hist_strength)
2025-05-06 15:49:31 +08:00
2025-05-06 16:34:46 +08:00
# 5. 区域翻转 - 强度已增加
if random.random() < region_flip_prob:
2025-05-06 15:49:31 +08:00
img_array = np.array(image)
2025-05-06 16:34:46 +08:00
h, w = img_array.shape[0], img_array.shape[1]
# 增加区域大小可能性
max_region_factor = 15 if strength == 'high' else 20
region_w = random.randint(w//(max_region_factor+5), w//max_region_factor)
region_h = random.randint(h//(max_region_factor+5), h//max_region_factor)
2025-05-06 15:49:31 +08:00
x = random.randint(0, w - region_w)
y = random.randint(0, h - region_h)
2025-05-06 16:34:46 +08:00
# 加入90度旋转的可能性
action = random.choice(['flip_h', 'flip_v', 'rotate_90']) if strength != 'low' else random.choice(['flip_h', 'flip_v'])
region = img_array[y:y+region_h, x:x+region_w]
if action == 'flip_h':
img_array[y:y+region_h, x:x+region_w] = region[:, ::-1]
elif action == 'flip_v':
img_array[y:y+region_h, x:x+region_w] = region[::-1, :]
elif action == 'rotate_90' and len(img_array.shape) == 3: # 旋转只对原尺寸区域有效
# 注意:旋转可能需要调整区域大小或填充,这里简化处理
# 仅在区域接近正方形时效果较好
if abs(region_w - region_h) < 5:
rotated_region = np.rot90(region)
# 需要确保旋转后尺寸匹配,如果尺寸变化则跳过或填充
if rotated_region.shape[0] == region_h and rotated_region.shape[1] == region_w:
img_array[y:y+region_h, x:x+region_w] = rotated_region
2025-05-06 15:49:31 +08:00
image = Image.fromarray(img_array)
2025-05-06 16:34:46 +08:00
# 6. (新增可选) 轻微高斯噪声 - 对所有哈希都有轻微普适性干扰
if strength != 'low' and random.random() < 0.4:
img_array = np.array(image)
noise_sigma = 1.0 if strength == 'medium' else 2.0 # 噪声标准差
noise = np.random.normal(0, noise_sigma, img_array.shape)
img_array = np.clip(img_array + noise, 0, 255).astype(np.uint8)
image = Image.fromarray(img_array)
2025-05-06 15:49:31 +08:00
return image
def optimized_process_image(
2025-04-26 15:53:44 +08:00
self,
image: Image.Image,
target_ratio: Tuple[int, int],
add_variation: bool = True,
seed: int = None,
variation_strength: str = "medium",
extra_effects: bool = True
2025-04-26 15:53:44 +08:00
) -> Image.Image:
"""优化后的图像处理方法,使用更高效的算法,添加反查重技术"""
# 设置随机种子
2025-04-26 15:53:44 +08:00
if seed is not None:
random.seed(seed)
np.random.seed(seed)
2025-04-26 15:53:44 +08:00
# 根据微调强度设置参数
2025-04-26 15:53:44 +08:00
if variation_strength == "low":
brightness_factor = random.uniform(0.97, 1.03)
contrast_factor = random.uniform(0.97, 1.03)
saturation_factor = random.uniform(0.97, 1.03)
2025-04-26 15:53:44 +08:00
max_rotation = 0.5
border_size = random.randint(0, 1)
use_extra = random.random() < 0.3 and extra_effects
2025-04-26 15:53:44 +08:00
elif variation_strength == "high":
brightness_factor = random.uniform(0.92, 1.08)
contrast_factor = random.uniform(0.92, 1.08)
saturation_factor = random.uniform(0.92, 1.08)
2025-04-26 15:53:44 +08:00
max_rotation = 2.0
border_size = random.randint(0, 3)
use_extra = extra_effects
else: # medium
brightness_factor = random.uniform(0.95, 1.05)
contrast_factor = random.uniform(0.95, 1.05)
saturation_factor = random.uniform(0.95, 1.05)
2025-04-26 15:53:44 +08:00
max_rotation = 1.0
border_size = random.randint(0, 2)
use_extra = random.random() < 0.7 and extra_effects
2025-04-26 15:53:44 +08:00
# 调整图像为目标比例
2025-04-26 15:53:44 +08:00
width, height = image.size
current_ratio = width / height
target_ratio_value = target_ratio[0] / target_ratio[1]
# 调整大小
2025-04-26 15:53:44 +08:00
if current_ratio > target_ratio_value: # 图片较宽
new_height = 1200
2025-04-26 15:53:44 +08:00
new_width = int(new_height * current_ratio)
else: # 图片较高
new_width = 900
2025-04-26 15:53:44 +08:00
new_height = int(new_width / current_ratio)
# 高效调整尺寸
2025-04-26 15:53:44 +08:00
resized_image = image.resize((new_width, new_height), Image.LANCZOS)
# 裁剪为目标比例
2025-04-26 15:53:44 +08:00
resized_width, resized_height = resized_image.size
if resized_width / resized_height > target_ratio_value:
2025-04-26 15:53:44 +08:00
crop_width = int(resized_height * target_ratio_value)
max_offset = max(1, min(10, (resized_width - crop_width) // 10))
offset = random.randint(-max_offset, max_offset) if add_variation else 0
crop_x1 = max(0, min((resized_width - crop_width) // 2 + offset, resized_width - crop_width))
2025-04-26 15:53:44 +08:00
crop_x2 = crop_x1 + crop_width
result = resized_image.crop((crop_x1, 0, crop_x2, resized_height))
else:
2025-04-26 15:53:44 +08:00
crop_height = int(resized_width / target_ratio_value)
max_offset = max(1, min(10, (resized_height - crop_height) // 10))
offset = random.randint(-max_offset, max_offset) if add_variation else 0
crop_y1 = max(0, min((resized_height - crop_height) // 2 + offset, resized_height - crop_height))
2025-04-26 15:53:44 +08:00
crop_y2 = crop_y1 + crop_height
result = resized_image.crop((0, crop_y1, resized_width, crop_y2))
# 如果不需要变化或是低强度且禁用额外效果
if not add_variation:
# 重置随机种子
2025-04-26 15:53:44 +08:00
if seed is not None:
random.seed()
np.random.seed()
# 清除元数据后返回
return self.strip_metadata(result)
2025-04-26 15:53:44 +08:00
# 高效应用基本变化
processed_image = result.convert('RGB')
2025-04-26 15:53:44 +08:00
# 1. 亮度调整
if abs(brightness_factor - 1.0) > 0.01:
enhancer = ImageEnhance.Brightness(processed_image)
processed_image = enhancer.enhance(brightness_factor)
2025-04-26 15:53:44 +08:00
# 2. 对比度调整
if abs(contrast_factor - 1.0) > 0.01:
enhancer = ImageEnhance.Contrast(processed_image)
processed_image = enhancer.enhance(contrast_factor)
# 3. 饱和度调整
if abs(saturation_factor - 1.0) > 0.01:
enhancer = ImageEnhance.Color(processed_image)
processed_image = enhancer.enhance(saturation_factor)
# 4. 旋转 (只在中高强度时应用)
if variation_strength != "low" and abs(max_rotation) > 0.1:
rotation_angle = random.uniform(-max_rotation, max_rotation)
if abs(rotation_angle) > 0.1: # 只有当角度足够大时才旋转
processed_image = processed_image.rotate(rotation_angle, resample=Image.BICUBIC, expand=False)
# 5. 新增 - 应用反查重技术
# 根据变化强度选择性应用
if use_extra:
2025-05-06 15:49:31 +08:00
# 使用综合优化的哈希对抗方法
processed_image = self.optimize_anti_hash_methods(processed_image, variation_strength)
# 应用额外效果 (只在需要时)
if use_extra:
# 根据强度决定是否应用特定效果
apply_sharpen = random.random() < 0.4
apply_blur = not apply_sharpen and random.random() < 0.3
2025-04-26 15:53:44 +08:00
# 锐化
if apply_sharpen:
enhancer = ImageEnhance.Sharpness(processed_image)
sharpness = 1.2 if variation_strength == "high" else 1.1
processed_image = enhancer.enhance(sharpness)
2025-04-26 15:53:44 +08:00
# 模糊
elif apply_blur:
radius = 0.7 if variation_strength == "high" else 0.4
processed_image = processed_image.filter(ImageFilter.GaussianBlur(radius=radius))
2025-04-26 15:53:44 +08:00
# 边框处理 (在图像不太小的情况下)
if border_size > 0 and min(processed_image.size) > 300:
border_color = (
random.randint(0, 5),
random.randint(0, 5),
random.randint(0, 5)
)
w, h = processed_image.size
bordered = Image.new('RGB', (w + border_size*2, h + border_size*2), border_color)
bordered.paste(processed_image, (border_size, border_size))
2025-04-26 15:53:44 +08:00
# 随机裁剪回原尺寸
offset_x = random.randint(0, border_size*2)
offset_y = random.randint(0, border_size*2)
processed_image = bordered.crop((offset_x, offset_y, offset_x + w, offset_y + h))
# 6. 始终清除元数据 - 最后一步
processed_image = self.strip_metadata(processed_image)
# 重置随机种子
if seed is not None:
random.seed()
np.random.seed()
return processed_image
2025-04-26 14:53:54 +08:00
2025-05-06 16:34:46 +08:00
def perturb_color_histogram(self, image: Image.Image, strength: float = 0.03) -> Image.Image:
"""
扰动图像的颜色直方图对抗基于颜色统计的图像匹配
Args:
image: 输入图像
strength: 扰动强度(0-1)
Returns:
处理后的图像
"""
# 确保为RGB模式
if image.mode != 'RGB':
image = image.convert('RGB')
# 转为numpy数组
img_array = np.array(image)
height, width, channels = img_array.shape
# 对每个通道分别处理
for channel in range(channels):
# 计算当前通道的直方图
hist, _ = np.histogram(img_array[:,:,channel].flatten(), bins=64, range=(0, 256))
# 找出主要颜色区间 (频率高的区间)
threshold = np.percentile(hist, 70) # 取前30%的颜色块
significant_bins = np.where(hist > threshold)[0]
if len(significant_bins) > 0:
for bin_idx in significant_bins:
# 计算当前bin对应的颜色范围
bin_width = 256 // 64
color_low = bin_idx * bin_width
color_high = (bin_idx + 1) * bin_width
# 创建颜色范围掩码
mask = (img_array[:,:,channel] >= color_low) & (img_array[:,:,channel] < color_high)
if np.any(mask):
# 生成随机偏移值
offset = int(strength * bin_width * (random.random() - 0.5) * 2)
# 应用偏移确保在0-255范围内
img_array[:,:,channel][mask] = np.clip(
img_array[:,:,channel][mask] + offset, 0, 255).astype(np.uint8)
# 转回PIL图像
return Image.fromarray(img_array)
def strip_metadata(self, image: Image.Image) -> Image.Image:
"""
移除图像中的所有元数据
Args:
image: 输入图像
Returns:
无元数据的图像
"""
# 创建无元数据的副本
data = io.BytesIO()
image.save(data, format=image.format if image.format else 'PNG')
return Image.open(data)
2025-04-26 14:53:54 +08:00
def process_poster_for_notes(
run_id: str,
topic_index: int,
variant_index: int,
poster_image_path: str,
poster_metadata_path: str,
source_image_dir: str,
num_additional_images: int,
output_handler: OutputHandler,
output_filename_template: str = "note_{index}.jpg"
) -> List[str]:
"""
处理海报并创建笔记图像
Args:
run_id: 运行ID
topic_index: 主题索引
variant_index: 变体索引
poster_image_path: 海报图像路径
poster_metadata_path: 海报元数据路径
source_image_dir: 源图像目录
num_additional_images: 要使用的额外图像数量
output_handler: 输出处理器
output_filename_template: 输出文件名模板
Returns:
List[str]: 保存的笔记图像路径列表
"""
logger.info(f"开始为海报创建笔记图像: {poster_image_path}")
# 验证输入
if not os.path.exists(poster_image_path):
logger.error(f"海报图像不存在: {poster_image_path}")
return []
# 创建处理器实例并处理
creator = PosterNotesCreator(output_handler)
return creator.create_notes_images(
run_id,
topic_index,
variant_index,
poster_image_path,
poster_metadata_path,
source_image_dir,
num_additional_images,
output_filename_template
)
def select_additional_images(
run_id: str,
topic_index: int,
variant_index: int,
poster_metadata_path: str,
source_image_dir: str,
num_additional_images: int,
output_handler: OutputHandler,
2025-04-26 15:53:44 +08:00
output_filename_template: str = "additional_{index}.jpg",
variation_strength: str = "medium",
extra_effects: bool = True
2025-04-26 14:53:54 +08:00
) -> List[str]:
"""
2025-04-26 15:53:44 +08:00
选择未被海报使用的图像作为额外配图并处理为3:4比例
2025-04-26 14:53:54 +08:00
"""
logger.info(f"开始为主题 {topic_index} 变体 {variant_index} 选择额外配图")
# 验证输入
if not os.path.exists(poster_metadata_path):
logger.error(f"海报元数据不存在: {poster_metadata_path}")
return []
# 创建处理器实例
2025-04-26 14:53:54 +08:00
creator = PosterNotesCreator(output_handler)
# 使用优化后的方法处理图像
2025-04-26 14:53:54 +08:00
return creator.create_additional_images(
run_id,
topic_index,
variant_index,
poster_metadata_path,
source_image_dir,
num_additional_images,
2025-04-26 15:53:44 +08:00
output_filename_template,
variation_strength,
extra_effects
2025-04-26 14:53:54 +08:00
)