TravelContentCreator/utils/poster_notes_creator.py

978 lines
41 KiB
Python
Raw Normal View History

2025-04-26 14:53:54 +08:00
import os
import random
import logging
import json
from PIL import Image
import traceback
from typing import List, Tuple, Dict, Any, Optional
import concurrent.futures
import numpy as np
from PIL import ImageEnhance, ImageFilter
2025-04-26 14:53:54 +08:00
from .output_handler import OutputHandler
import io
2025-04-26 14:53:54 +08:00
2025-05-06 16:34:46 +08:00
# 尝试导入 scipy如果失败则标记
try:
from scipy.fftpack import dct, idct
SCIPY_AVAILABLE = True
except ImportError:
SCIPY_AVAILABLE = False
dct = None
idct = None
2025-04-26 14:53:54 +08:00
logger = logging.getLogger(__name__)
class PosterNotesCreator:
"""
处理原始海报作为主图并随机选择额外的图片作为笔记图片
确保选择的笔记图片与海报中使用的图片不重复
"""
def __init__(self, output_handler: OutputHandler):
"""
初始化 PosterNotesCreator
Args:
output_handler: 可选的 OutputHandler 实例用于处理输出
"""
self.output_handler = output_handler
logging.info("PosterNotesCreator 初始化完成")
def create_notes_images(
self,
run_id: str,
topic_index: int,
variant_index: int,
poster_image_path: str,
poster_metadata_path: str,
source_image_dir: str,
num_additional_images: int,
output_filename_template: str = "note_{index}.jpg"
) -> List[str]:
"""
创建笔记图像
Args:
run_id: 运行ID
topic_index: 主题索引
variant_index: 变体索引
poster_image_path: 海报图像路径
poster_metadata_path: 海报元数据路径
source_image_dir: 源图像目录
num_additional_images: 要使用的额外图像数量
output_filename_template: 输出文件名模板
Returns:
List[str]: 保存的笔记图像路径列表
"""
# 检查输入路径是否存在
if not os.path.exists(poster_image_path):
logger.error(f"海报图像不存在: {poster_image_path}")
return []
if not os.path.exists(poster_metadata_path):
logger.error(f"海报元数据不存在: {poster_metadata_path}")
return []
if not os.path.exists(source_image_dir) or not os.path.isdir(source_image_dir):
logger.error(f"源图像目录不存在: {source_image_dir}")
return []
# 从元数据文件中读取已使用的图像信息
try:
with open(poster_metadata_path, 'r', encoding='utf-8') as f:
poster_metadata = json.load(f)
except Exception as e:
logger.error(f"无法读取海报元数据: {e}")
return []
# 获取已经在海报中使用的图像
used_images = []
if 'collage_images' in poster_metadata:
used_images = poster_metadata['collage_images']
logger.info(f"海报中已使用 {len(used_images)} 张图像: {', '.join(used_images)}")
# 列出源目录中的所有图像文件
image_extensions = ('.jpg', '.jpeg', '.png', '.bmp')
available_images = [
f for f in os.listdir(source_image_dir)
if os.path.isfile(os.path.join(source_image_dir, f)) and
f.lower().endswith(image_extensions)
]
if not available_images:
logger.error(f"源目录中没有找到图像: {source_image_dir}")
return []
logger.info(f"源目录中找到 {len(available_images)} 张图像")
# 过滤掉已经在海报中使用的图像
available_images = [img for img in available_images if img not in used_images]
if not available_images:
logger.warning("所有图像都已在海报中使用,无法创建额外笔记")
return []
logger.info(f"过滤后可用图像数量: {len(available_images)}")
# 如果可用图像少于请求数量,进行警告但继续处理
if len(available_images) < num_additional_images:
logger.warning(
f"可用图像数量 ({len(available_images)}) 少于请求的笔记数量 ({num_additional_images})"
f"将使用所有可用图像"
)
selected_images = available_images
else:
# 随机选择额外图像
selected_images = random.sample(available_images, num_additional_images)
logger.info(f"已选择 {len(selected_images)} 张图像作为笔记")
# 保存选择的笔记图像
saved_paths = []
for i, image_filename in enumerate(selected_images):
try:
# 加载图像
image_path = os.path.join(source_image_dir, image_filename)
image = Image.open(image_path)
# 生成输出文件名
output_filename = output_filename_template.format(index=i+1)
# 创建元数据
note_metadata = {
"original_image": image_filename,
"note_index": i + 1,
"source_dir": source_image_dir,
"associated_poster": os.path.basename(poster_image_path)
}
# 使用输出处理器保存图像
saved_path = self.output_handler.handle_generated_image(
run_id,
topic_index,
variant_index,
'note', # 图像类型为note
image,
output_filename,
note_metadata
)
saved_paths.append(saved_path)
logger.info(f"已保存笔记图像 {i+1}/{len(selected_images)}: {saved_path}")
except Exception as e:
logger.error(f"处理图像时出错 '{image_filename}': {e}")
return saved_paths
def create_additional_images(
self,
run_id: str,
topic_index: int,
variant_index: int,
poster_metadata_path: str,
source_image_dir: str,
num_additional_images: int = 3,
2025-04-26 15:53:44 +08:00
output_filename_template: str = "additional_{index}.jpg",
variation_strength: str = "medium",
extra_effects: bool = True
2025-04-26 14:53:54 +08:00
) -> List[str]:
"""选择未被海报使用的图像作为额外配图并处理为3:4比例"""
logger.info(f"开始为主题 {topic_index} 变体 {variant_index} 选择额外配图")
2025-04-26 14:53:54 +08:00
# 获取候选图像
candidate_images = self.get_candidate_images(
poster_metadata_path,
source_image_dir,
num_additional_images
)
if not candidate_images:
logger.warning("没有找到合适的候选图像")
return []
2025-04-26 14:53:54 +08:00
# 生成唯一的随机种子
seed_str = f"{run_id}_{topic_index}_{variant_index}"
seed = sum(ord(c) for c in seed_str)
logger.info(f"使用随机种子: {seed},基于: {seed_str}")
# 使用多进程并行处理图像
saved_paths = []
with concurrent.futures.ProcessPoolExecutor(max_workers=min(4, len(candidate_images))) as executor:
# 创建任务
future_to_image = {}
for i, image_filename in enumerate(candidate_images):
image_path = os.path.join(source_image_dir, image_filename)
# 为每个图像创建单独的种子
image_seed = seed + i
future = executor.submit(
self.process_single_image,
run_id,
topic_index,
variant_index,
image_path,
image_filename,
i,
source_image_dir,
output_filename_template.format(index=i+1),
image_seed,
variation_strength,
extra_effects
)
future_to_image[future] = (i, image_filename)
# 收集结果
for future in concurrent.futures.as_completed(future_to_image):
i, image_filename = future_to_image[future]
try:
saved_path = future.result()
if saved_path:
saved_paths.append(saved_path)
logger.info(f"已保存额外配图 {i+1}/{len(candidate_images)}: {saved_path}")
except Exception as e:
logger.error(f"处理图像时出错 '{image_filename}': {e}")
logger.error(traceback.format_exc())
2025-04-26 14:53:54 +08:00
return saved_paths
def get_candidate_images(self, poster_metadata_path, source_image_dir, num_images):
"""获取候选图像列表,排除已用于海报的图像"""
2025-04-26 14:53:54 +08:00
# 检查输入路径是否存在
if not os.path.exists(poster_metadata_path):
logger.error(f"海报元数据不存在: {poster_metadata_path}")
return []
if not os.path.exists(source_image_dir) or not os.path.isdir(source_image_dir):
logger.error(f"源图像目录不存在: {source_image_dir}")
return []
# 从元数据文件中读取已使用的图像信息
try:
with open(poster_metadata_path, 'r', encoding='utf-8') as f:
poster_metadata = json.load(f)
except Exception as e:
logger.error(f"无法读取海报元数据: {e}")
return []
# 获取已经在海报中使用的图像
used_images = []
if 'collage_images' in poster_metadata:
used_images = poster_metadata['collage_images']
logger.info(f"海报中已使用 {len(used_images)} 张图像: {', '.join(used_images)}")
# 列出源目录中的所有图像文件
image_extensions = ('.jpg', '.jpeg', '.png', '.bmp')
available_images = [
f for f in os.listdir(source_image_dir)
if os.path.isfile(os.path.join(source_image_dir, f)) and
f.lower().endswith(image_extensions)
]
if not available_images:
logger.error(f"源目录中没有找到图像: {source_image_dir}")
return []
logger.info(f"源目录中找到 {len(available_images)} 张图像")
# 过滤掉已经在海报中使用的图像
available_images = [img for img in available_images if img not in used_images]
if not available_images:
logger.warning("所有图像都已在海报中使用,无法创建额外配图")
return []
logger.info(f"过滤后可用图像数量: {len(available_images)}")
# 如果可用图像少于请求数量,进行警告但继续处理
if len(available_images) < num_images:
2025-04-26 14:53:54 +08:00
logger.warning(
f"可用图像数量 ({len(available_images)}) 少于请求的配图数量 ({num_images})"
2025-04-26 14:53:54 +08:00
f"将使用所有可用图像"
)
selected_images = available_images
else:
# 随机选择额外图像
random.seed(sum(map(ord, ''.join(available_images)))) # 确保结果一致性
selected_images = random.sample(available_images, num_images)
random.seed() # 重置随机种子
2025-04-26 14:53:54 +08:00
return selected_images
def process_single_image(
self,
run_id,
topic_index,
variant_index,
image_path,
image_filename,
index,
source_dir,
output_filename,
seed,
variation_strength,
extra_effects
):
"""处理单张图像 - 此方法可在独立进程中运行"""
try:
# 加载图像
image = Image.open(image_path)
# 处理图像为3:4比例并添加微小变化
processed_image = self.optimized_process_image(
image,
(3, 4),
add_variation=True,
seed=seed,
variation_strength=variation_strength,
extra_effects=extra_effects
)
# 创建元数据
additional_metadata = {
"original_image": image_filename,
"additional_index": index + 1,
"source_dir": source_dir,
"is_additional_image": True,
"processed": True,
"aspect_ratio": "3:4",
"variation_applied": True,
"variation_strength": variation_strength,
"extra_effects": extra_effects
}
# 使用输出处理器保存图像
return self.output_handler.handle_generated_image(
run_id,
topic_index,
variant_index,
'additional', # 图像类型为additional
processed_image,
output_filename,
additional_metadata
)
except Exception as e:
logger.error(f"处理图像时出错 '{image_filename}': {e}")
logger.error(traceback.format_exc())
return None
2025-05-06 16:34:46 +08:00
def add_dct_noise(self, image: Image.Image, intensity: float = 0.1, block_size: int = 8) -> Image.Image:
"""
2025-05-06 16:34:46 +08:00
在DCT域添加噪声以对抗pHash (需要Scipy)
Args:
2025-05-06 16:34:46 +08:00
image: 输入图像 (建议传入灰度图或处理亮度通道)
intensity: 噪声强度 (0-1)
block_size: DCT块大小 (通常为8)
Returns:
2025-05-06 16:34:46 +08:00
添加噪声后的图像
"""
2025-05-06 16:34:46 +08:00
if not SCIPY_AVAILABLE:
logger.warning("Scipy 未安装无法执行DCT噪声注入。请运行 'pip install scipy'")
# 可以选择返回原图,或执行一个简化的备用方案
# 这里我们返回原图
return image
2025-05-06 16:34:46 +08:00
try:
logger.debug(f"应用DCT噪声强度: {intensity:.3f}")
2025-05-06 16:34:46 +08:00
# 确保是灰度图或提取亮度通道 (这里以灰度为例)
if image.mode != 'L':
# 如果是彩色图,可以在 Y 通道 (亮度) 操作
# 为了简化,我们先转为灰度处理
gray_image = image.convert('L')
else:
gray_image = image
2025-05-06 16:34:46 +08:00
img_array = np.array(gray_image, dtype=float)
h, w = img_array.shape
2025-05-06 16:34:46 +08:00
# 确保尺寸是块大小的倍数
h_pad = (block_size - h % block_size) % block_size
w_pad = (block_size - w % block_size) % block_size
if h_pad != 0 or w_pad != 0:
img_array = np.pad(img_array, ((0, h_pad), (0, w_pad)), mode='reflect')
padded_h, padded_w = img_array.shape
else:
padded_h, padded_w = h, w
2025-05-06 16:34:46 +08:00
# 分块处理
for y in range(0, padded_h, block_size):
for x in range(0, padded_w, block_size):
block = img_array[y:y+block_size, x:x+block_size]
2025-05-06 16:34:46 +08:00
# 执行2D DCT
dct_block = dct(dct(block.T, norm='ortho').T, norm='ortho')
2025-05-06 16:34:46 +08:00
# 在非DC系数上添加噪声 (跳过 dct_block[0, 0])
# 噪声强度与系数幅度相关,避免在小系数上加过大噪声
noise = np.random.randn(block_size, block_size) * intensity * np.abs(dct_block)
# noise = np.random.uniform(-intensity*50, intensity*50, (block_size, block_size))
noise[0, 0] = 0 # 不改变DC系数
# 将噪声添加到DCT系数
noisy_dct_block = dct_block + noise
# 执行2D IDCT
idct_block = idct(idct(noisy_dct_block.T, norm='ortho').T, norm='ortho')
# 将处理后的块放回图像数组
img_array[y:y+block_size, x:x+block_size] = idct_block
# 裁剪回原始尺寸 (如果有填充)
if h_pad != 0 or w_pad != 0:
img_array = img_array[:h, :w]
# 裁剪像素值并转换类型
img_array = np.clip(img_array, 0, 255)
modified_gray = Image.fromarray(img_array.astype(np.uint8))
# 如果原图是彩色,将修改后的亮度通道合并回去
if image.mode == 'RGB' and gray_image is not image:
# 注意:简单替换亮度通道可能效果不好,混合通常更好
# 这里用混合的方式
blend_factor = 0.3 # 控制混合强度
r, g, b = image.split()
r = Image.blend(r, modified_gray, blend_factor)
g = Image.blend(g, modified_gray, blend_factor)
b = Image.blend(b, modified_gray, blend_factor)
merged_image = Image.merge('RGB', (r, g, b))
2025-05-06 16:34:46 +08:00
else:
# 如果原图是灰度或处理失败,返回修改后的灰度图
merged_image = modified_gray
2025-05-06 16:34:46 +08:00
# 在函数末尾成功时记录
logger.debug("DCT噪声应用成功。")
return merged_image if 'merged_image' in locals() else modified_gray # 返回最终结果
2025-05-06 16:34:46 +08:00
except Exception as e:
logger.error(f"DCT噪声注入出错: {e}")
return image # 出错时返回原图
2025-05-06 16:34:46 +08:00
def add_phash_noise(self, image: Image.Image, intensity: float = 0.05) -> Image.Image:
"""
2025-05-06 16:34:46 +08:00
添加扰动以对抗感知哈希算法(pHash)
现在调用基于 Scipy DCT 噪声注入方法
Args:
image: 输入图像
2025-05-06 16:34:46 +08:00
intensity: 扰动强度(0-1)
Returns:
2025-05-06 16:34:46 +08:00
添加扰动后的图像
"""
logger.debug(f"调用add_dct_noise对抗pHash强度: {intensity:.3f}")
2025-05-06 16:34:46 +08:00
return self.add_dct_noise(image, intensity=intensity)
2025-05-06 15:49:31 +08:00
def optimize_anti_hash_methods(self, image: Image.Image, strength: str = "medium") -> Image.Image:
"""综合优化的哈希对抗方法强度已增加添加日志记录High强度极度强化"""
logger.info(f"--- 开始优化抗哈希方法 (强度: {strength}) ---")
original_image_for_logging = image.copy() # 复制一份用于前后对比日志
# 根据强度设置参数 (极度增加 high 强度)
2025-05-06 15:49:31 +08:00
if strength == "low":
ahash_intensity = 0.03
2025-05-06 16:34:46 +08:00
phash_intensity = 0.05 # 基础DCT噪声强度
2025-05-06 15:49:31 +08:00
dhash_intensity = 0.03
2025-05-06 16:34:46 +08:00
region_flip_prob = 0.3
num_ahash_blocks = random.randint(8, 15)
num_dhash_lines = random.randint(6, 10)
ahash_delta_range = (-30, 30)
dhash_delta_range = 30
region_max_factor = 25
gaussian_noise_sigma = 0.0 # Low 不加高斯噪声
gaussian_noise_prob = 0.0
2025-05-06 15:49:31 +08:00
elif strength == "high":
ahash_intensity = 0.40 # 极度增加
phash_intensity = 0.25 # 极度增加
dhash_intensity = 0.40 # 极度增加
region_flip_prob = 0.90 # 很高概率翻转
num_ahash_blocks = random.randint(40, 60) # 很多块
num_dhash_lines = random.randint(30, 45) # 很多线
ahash_delta_range = (-50, 50) # 更大亮度变化
dhash_delta_range = 50 # 更大梯度变化
region_max_factor = 12 # 区域更大
gaussian_noise_sigma = 3.5 # 更强高斯噪声
gaussian_noise_prob = 0.6 # 更高概率
else: # medium (也适度增加)
ahash_intensity = 0.10 # 增加
phash_intensity = 0.10 # 增加
dhash_intensity = 0.10 # 增加
region_flip_prob = 0.6
num_ahash_blocks = random.randint(15, 30)
num_dhash_lines = random.randint(12, 20)
ahash_delta_range = (-40, 40)
dhash_delta_range = 40
region_max_factor = 18
gaussian_noise_sigma = 1.5
gaussian_noise_prob = 0.5
logger.debug(f"参数: aHash强度={ahash_intensity:.2f}, pHash强度={phash_intensity:.2f}, dHash强度={dhash_intensity:.2f}, 翻转概率={region_flip_prob:.2f}")
# 1. 针对aHash ...
logger.debug(f"应用 aHash 对抗: {num_ahash_blocks} 个亮度块, 强度={ahash_intensity:.2f}, delta范围={ahash_delta_range}")
img_array = np.array(image, dtype=np.int16)
2025-05-06 15:49:31 +08:00
h, w = img_array.shape[0], img_array.shape[1]
2025-05-06 16:34:46 +08:00
for _ in range(num_ahash_blocks):
2025-05-06 15:49:31 +08:00
block_w = random.randint(w//20, w//10)
block_h = random.randint(h//20, h//10)
x = random.randint(0, w - block_w)
y = random.randint(0, h - block_h)
delta = int(random.uniform(ahash_delta_range[0], ahash_delta_range[1]) * ahash_intensity)
2025-05-06 16:34:46 +08:00
block = img_array[y:y+block_h, x:x+block_w]
img_array[y:y+block_h, x:x+block_w] = np.clip(block + delta, 0, 255)
2025-05-06 15:49:31 +08:00
image = Image.fromarray(img_array.astype(np.uint8))
logger.debug("aHash 对抗完成。")
# 2. 调用强化的pHash对抗方法 (使用 scipy DCT)
logger.debug(f"应用 pHash 对抗 (DCT噪声), 强度={phash_intensity:.2f}")
image = self.add_phash_noise(image, intensity=phash_intensity)
# add_phash_noise 内部已有成功日志
# 3. 针对dHash ...
logger.debug(f"应用 dHash 对抗: {num_dhash_lines} 条梯度线, 强度={dhash_intensity:.2f}, delta范围=+/-({dhash_delta_range})")
img_array = np.array(image, dtype=np.int16)
2025-05-06 16:34:46 +08:00
h, w = img_array.shape[0], img_array.shape[1]
2025-05-06 15:49:31 +08:00
mask = np.zeros_like(img_array, dtype=bool)
2025-05-06 16:34:46 +08:00
for _ in range(num_dhash_lines):
line_width = random.randint(2, 5) # 增加线宽
2025-05-06 16:34:46 +08:00
if random.random() < 0.5:
2025-05-06 15:49:31 +08:00
y = random.randint(0, h - 1)
2025-05-06 16:34:46 +08:00
if len(mask.shape) == 3:
2025-05-06 15:49:31 +08:00
mask[max(0, y-line_width//2):min(h, y+line_width//2+1), :, :] = True
2025-05-06 16:34:46 +08:00
else:
2025-05-06 15:49:31 +08:00
mask[max(0, y-line_width//2):min(h, y+line_width//2+1), :] = True
2025-05-06 16:34:46 +08:00
else:
2025-05-06 15:49:31 +08:00
x = random.randint(0, w - 1)
2025-05-06 16:34:46 +08:00
if len(mask.shape) == 3:
2025-05-06 15:49:31 +08:00
mask[:, max(0, x-line_width//2):min(w, x+line_width//2+1), :] = True
2025-05-06 16:34:46 +08:00
else:
2025-05-06 15:49:31 +08:00
mask[:, max(0, x-line_width//2):min(w, x+line_width//2+1)] = True
delta = (np.random.random(img_array.shape) * 2 - 1) * dhash_intensity * dhash_delta_range
2025-05-06 16:34:46 +08:00
img_array[mask] += delta[mask].astype(np.int16)
img_array = np.clip(img_array, 0, 255)
image = Image.fromarray(img_array.astype(np.uint8))
logger.debug("dHash 对抗完成。")
# 4. 颜色直方图扰动 ...
color_hist_strength = dhash_intensity * 0.7 # 关联强度增加
logger.debug(f"应用颜色直方图扰动, 强度={color_hist_strength:.3f}")
2025-05-06 16:34:46 +08:00
image = self.perturb_color_histogram(image, strength=color_hist_strength)
# perturb_color_histogram 内部已有成功日志
# 5. 区域翻转/旋转 ...
2025-05-06 16:34:46 +08:00
if random.random() < region_flip_prob:
logger.debug(f"应用区域变换 (翻转/旋转), 概率触发成功.")
2025-05-06 15:49:31 +08:00
img_array = np.array(image)
2025-05-06 16:34:46 +08:00
h, w = img_array.shape[0], img_array.shape[1]
region_w = random.randint(w//(region_max_factor+5), w//region_max_factor)
region_h = random.randint(h//(region_max_factor+5), h//region_max_factor)
region_w = max(1, region_w)
region_h = max(1, region_h)
x = random.randint(0, max(0, w - region_w))
y = random.randint(0, max(0, h - region_h))
2025-05-06 16:34:46 +08:00
action = random.choice(['flip_h', 'flip_v', 'rotate_90']) if strength != 'low' else random.choice(['flip_h', 'flip_v'])
action_applied = "skipped"
try: # 添加 try-except 捕获潜在错误
if y+region_h <= h and x+region_w <= w:
region = img_array[y:y+region_h, x:x+region_w]
if region.size == 0:
action_applied = "empty_region_skipped"
elif action == 'flip_h':
img_array[y:y+region_h, x:x+region_w] = region[:, ::-1]
action_applied = action
elif action == 'flip_v':
img_array[y:y+region_h, x:x+region_w] = region[::-1, :]
action_applied = action
elif action == 'rotate_90' and len(img_array.shape) == 3:
if abs(region_w - region_h) < region_w * 0.3:
rotated_region = np.rot90(region)
if rotated_region.shape[0] == region_h and rotated_region.shape[1] == region_w:
img_array[y:y+region_h, x:x+region_w] = rotated_region
action_applied = action
else:
logger.debug(f" 区域旋转跳过: 尺寸不匹配 ({region.shape} -> {rotated_region.shape})")
action_applied = "rotate_skipped_size_mismatch"
else:
logger.debug(f" 区域旋转跳过: 非方形区域 ({region_w}x{region_h})")
action_applied = "rotate_skipped_not_square"
else:
action_applied = f"{action}_skipped_condition_not_met"
else:
action_applied = "region_bounds_error_skipped"
logger.warning(f"跳过区域变换,切片索引无效: y={y}, h={region_h}, x={x}, w={region_w}, img_shape={img_array.shape}")
except Exception as e_region:
logger.warning(f"应用区域变换 {action} 时出错: {e_region}")
action_applied = f"error: {e_region}"
2025-05-06 15:49:31 +08:00
if action_applied not in ["skipped", "empty_region_skipped", "rotate_skipped_size_mismatch", "rotate_skipped_not_square", "region_bounds_error_skipped"] and not action_applied.startswith("error"):
image = Image.fromarray(img_array)
logger.debug(f" 执行了区域操作: {action_applied} 在 ({x},{y}) 大小 ({region_w}x{region_h})")
else:
logger.debug(f" 区域变换未执行或跳过: {action_applied}")
else:
logger.debug("跳过区域变换 (概率未触发).")
# 6. 轻微高斯噪声 ...
apply_gaussian_noise = random.random() < gaussian_noise_prob
if gaussian_noise_sigma > 0 and apply_gaussian_noise:
logger.debug(f"应用高斯噪声, sigma={gaussian_noise_sigma:.1f}")
2025-05-06 16:34:46 +08:00
img_array = np.array(image)
noise = np.random.normal(0, gaussian_noise_sigma, img_array.shape)
2025-05-06 16:34:46 +08:00
img_array = np.clip(img_array + noise, 0, 255).astype(np.uint8)
image = Image.fromarray(img_array)
elif gaussian_noise_sigma > 0:
logger.debug("跳过高斯噪声 (概率未触发).")
# ... (对比日志不变) ...
2025-05-06 16:34:46 +08:00
logger.info(f"--- 完成优化抗哈希方法 (强度: {strength}) ---")
2025-05-06 15:49:31 +08:00
return image
def optimized_process_image(
2025-04-26 15:53:44 +08:00
self,
image: Image.Image,
target_ratio: Tuple[int, int],
add_variation: bool = True,
seed: int = None,
variation_strength: str = "medium",
extra_effects: bool = True
2025-04-26 15:53:44 +08:00
) -> Image.Image:
"""优化后的图像处理方法,使用更高效的算法,添加反查重技术"""
# 设置随机种子
2025-04-26 15:53:44 +08:00
if seed is not None:
random.seed(seed)
np.random.seed(seed)
2025-04-26 15:53:44 +08:00
# 根据微调强度设置参数
2025-04-26 15:53:44 +08:00
if variation_strength == "low":
brightness_factor = random.uniform(0.97, 1.03)
contrast_factor = random.uniform(0.97, 1.03)
saturation_factor = random.uniform(0.97, 1.03)
2025-04-26 15:53:44 +08:00
max_rotation = 0.5
border_size = random.randint(0, 1)
use_extra = random.random() < 0.3 and extra_effects
2025-04-26 15:53:44 +08:00
elif variation_strength == "high":
brightness_factor = random.uniform(0.92, 1.08)
contrast_factor = random.uniform(0.92, 1.08)
saturation_factor = random.uniform(0.92, 1.08)
2025-04-26 15:53:44 +08:00
max_rotation = 2.0
border_size = random.randint(0, 3)
use_extra = extra_effects
else: # medium
brightness_factor = random.uniform(0.95, 1.05)
contrast_factor = random.uniform(0.95, 1.05)
saturation_factor = random.uniform(0.95, 1.05)
2025-04-26 15:53:44 +08:00
max_rotation = 1.0
border_size = random.randint(0, 2)
use_extra = random.random() < 0.7 and extra_effects
2025-04-26 15:53:44 +08:00
# 调整图像为目标比例
2025-04-26 15:53:44 +08:00
width, height = image.size
current_ratio = width / height
target_ratio_value = target_ratio[0] / target_ratio[1]
# 调整大小
2025-04-26 15:53:44 +08:00
if current_ratio > target_ratio_value: # 图片较宽
new_height = 1200
2025-04-26 15:53:44 +08:00
new_width = int(new_height * current_ratio)
else: # 图片较高
new_width = 900
2025-04-26 15:53:44 +08:00
new_height = int(new_width / current_ratio)
# 高效调整尺寸
2025-04-26 15:53:44 +08:00
resized_image = image.resize((new_width, new_height), Image.LANCZOS)
# 裁剪为目标比例
2025-04-26 15:53:44 +08:00
resized_width, resized_height = resized_image.size
if resized_width / resized_height > target_ratio_value:
2025-04-26 15:53:44 +08:00
crop_width = int(resized_height * target_ratio_value)
max_offset = max(1, min(10, (resized_width - crop_width) // 10))
offset = random.randint(-max_offset, max_offset) if add_variation else 0
crop_x1 = max(0, min((resized_width - crop_width) // 2 + offset, resized_width - crop_width))
2025-04-26 15:53:44 +08:00
crop_x2 = crop_x1 + crop_width
result = resized_image.crop((crop_x1, 0, crop_x2, resized_height))
else:
2025-04-26 15:53:44 +08:00
crop_height = int(resized_width / target_ratio_value)
max_offset = max(1, min(10, (resized_height - crop_height) // 10))
offset = random.randint(-max_offset, max_offset) if add_variation else 0
crop_y1 = max(0, min((resized_height - crop_height) // 2 + offset, resized_height - crop_height))
2025-04-26 15:53:44 +08:00
crop_y2 = crop_y1 + crop_height
result = resized_image.crop((0, crop_y1, resized_width, crop_y2))
# 如果不需要变化或是低强度且禁用额外效果
if not add_variation:
logger.info("add_variation=False跳过所有变化和抗哈希处理。")
# 重置随机种子
2025-04-26 15:53:44 +08:00
if seed is not None:
random.seed()
np.random.seed()
# 清除元数据后返回
return self.strip_metadata(result)
2025-04-26 15:53:44 +08:00
logger.info(f"应用基础变化和抗哈希处理 (强度: {variation_strength}, 额外效果: {use_extra})")
processed_image = result.convert('RGB')
2025-04-26 15:53:44 +08:00
# 1. 亮度调整
if abs(brightness_factor - 1.0) > 0.01:
enhancer = ImageEnhance.Brightness(processed_image)
processed_image = enhancer.enhance(brightness_factor)
2025-04-26 15:53:44 +08:00
# 2. 对比度调整
if abs(contrast_factor - 1.0) > 0.01:
enhancer = ImageEnhance.Contrast(processed_image)
processed_image = enhancer.enhance(contrast_factor)
# 3. 饱和度调整
if abs(saturation_factor - 1.0) > 0.01:
enhancer = ImageEnhance.Color(processed_image)
processed_image = enhancer.enhance(saturation_factor)
# 4. 旋转 (只在中高强度时应用)
if variation_strength != "low" and abs(max_rotation) > 0.1:
rotation_angle = random.uniform(-max_rotation, max_rotation)
if abs(rotation_angle) > 0.1: # 只有当角度足够大时才旋转
processed_image = processed_image.rotate(rotation_angle, resample=Image.BICUBIC, expand=False)
# 5. 应用抗哈希技术
if use_extra:
logger.debug("调用 optimize_anti_hash_methods...")
2025-05-06 15:49:31 +08:00
processed_image = self.optimize_anti_hash_methods(processed_image, variation_strength)
else:
logger.info("use_extra=False跳过 optimize_anti_hash_methods。")
# 应用模糊/锐化/边框等额外效果 (如果 use_extra 为 True)
if use_extra:
logger.debug("应用额外效果 (模糊/锐化/边框)...")
# 根据强度决定是否应用特定效果
apply_sharpen = random.random() < 0.4
apply_blur = not apply_sharpen and random.random() < 0.3
2025-04-26 15:53:44 +08:00
# 锐化
if apply_sharpen:
enhancer = ImageEnhance.Sharpness(processed_image)
sharpness = 1.2 if variation_strength == "high" else 1.1
processed_image = enhancer.enhance(sharpness)
2025-04-26 15:53:44 +08:00
# 模糊
elif apply_blur:
radius = 0.7 if variation_strength == "high" else 0.4
processed_image = processed_image.filter(ImageFilter.GaussianBlur(radius=radius))
2025-04-26 15:53:44 +08:00
# 边框处理 (在图像不太小的情况下)
if border_size > 0 and min(processed_image.size) > 300:
border_color = (
random.randint(0, 5),
random.randint(0, 5),
random.randint(0, 5)
)
w, h = processed_image.size
bordered = Image.new('RGB', (w + border_size*2, h + border_size*2), border_color)
bordered.paste(processed_image, (border_size, border_size))
logger.debug("额外效果应用完成。")
else:
logger.info("use_extra=False跳过额外效果。")
# **关键:确保在所有修改之后调用修复后的 strip_metadata**
logger.debug("最后调用 strip_metadata 清除元数据。")
final_image = self.strip_metadata(processed_image)
# 重置随机种子
if seed is not None:
random.seed()
np.random.seed()
logger.debug("随机种子已重置。")
logger.info(f"图像处理完成 (强度: {variation_strength})")
return final_image
2025-04-26 14:53:54 +08:00
2025-05-06 16:34:46 +08:00
def perturb_color_histogram(self, image: Image.Image, strength: float = 0.03) -> Image.Image:
"""
扰动图像的颜色直方图对抗基于颜色统计的图像匹配
Args:
image: 输入图像
strength: 扰动强度(0-1)
Returns:
处理后的图像
"""
logger.debug(f"扰动颜色直方图,强度: {strength:.3f}")
2025-05-06 16:34:46 +08:00
# 确保为RGB模式
if image.mode != 'RGB':
image = image.convert('RGB')
# 转为numpy数组
img_array = np.array(image)
height, width, channels = img_array.shape
# 对每个通道分别处理
for channel in range(channels):
# 计算当前通道的直方图
hist, _ = np.histogram(img_array[:,:,channel].flatten(), bins=64, range=(0, 256))
# 找出主要颜色区间 (频率高的区间)
threshold = np.percentile(hist, 70) # 取前30%的颜色块
significant_bins = np.where(hist > threshold)[0]
if len(significant_bins) > 0:
for bin_idx in significant_bins:
# 计算当前bin对应的颜色范围
bin_width = 256 // 64
color_low = bin_idx * bin_width
color_high = (bin_idx + 1) * bin_width
# 创建颜色范围掩码
mask = (img_array[:,:,channel] >= color_low) & (img_array[:,:,channel] < color_high)
if np.any(mask):
# 生成随机偏移值
offset = int(strength * bin_width * (random.random() - 0.5) * 2)
# 应用偏移确保在0-255范围内
img_array[:,:,channel][mask] = np.clip(
img_array[:,:,channel][mask] + offset, 0, 255).astype(np.uint8)
# 转回PIL图像
logger.debug("颜色直方图扰动成功。")
2025-05-06 16:34:46 +08:00
return Image.fromarray(img_array)
def strip_metadata(self, image: Image.Image) -> Image.Image:
"""
移除图像中的所有元数据 (修复版)
2025-05-06 16:34:46 +08:00
Args:
image: 输入图像
Returns:
无元数据的图像
"""
logger.debug("移除图像元数据...")
try:
# 从当前图像对象的像素数据创建一个新的Image对象
# 这确保我们使用的是内存中修改后的数据
# 保留原始格式信息,如果可用
img_format = image.format if hasattr(image, 'format') else 'PNG'
# 如果图像有alpha通道需要正确处理
if image.mode == 'RGBA':
# 创建一个白色背景然后粘贴带有alpha的图像
background = Image.new("RGB", image.size, (255, 255, 255))
background.paste(image, mask=image.split()[3]) # 3 is the alpha channel
image_to_save = background
img_format = 'JPEG' # 通常去除alpha后存为JPEG
elif image.mode == 'P':
# 带调色板的图像转换为RGB
image_to_save = image.convert('RGB')
img_format = 'JPEG'
else:
image_to_save = image
# 保存到内存缓冲区
data = io.BytesIO()
# 确保保存时指定质量参数,避免默认压缩导致意外变化
if img_format == 'JPEG':
image_to_save.save(data, format='JPEG', quality=95) # 使用高质量JPEG
else:
# 对于PNG等无损格式不需要质量参数
image_to_save.save(data, format=img_format)
data.seek(0) # 重置缓冲区指针
reloaded_image = Image.open(data)
logger.debug("元数据移除成功。")
return reloaded_image
except Exception as e:
logger.error(f"移除元数据时出错: {e}")
return image # 出错时返回原图
2025-05-06 16:34:46 +08:00
2025-04-26 14:53:54 +08:00
def process_poster_for_notes(
run_id: str,
topic_index: int,
variant_index: int,
poster_image_path: str,
poster_metadata_path: str,
source_image_dir: str,
num_additional_images: int,
output_handler: OutputHandler,
output_filename_template: str = "note_{index}.jpg"
) -> List[str]:
"""
处理海报并创建笔记图像
Args:
run_id: 运行ID
topic_index: 主题索引
variant_index: 变体索引
poster_image_path: 海报图像路径
poster_metadata_path: 海报元数据路径
source_image_dir: 源图像目录
num_additional_images: 要使用的额外图像数量
output_handler: 输出处理器
output_filename_template: 输出文件名模板
Returns:
List[str]: 保存的笔记图像路径列表
"""
logger.info(f"开始为海报创建笔记图像: {poster_image_path}")
# 验证输入
if not os.path.exists(poster_image_path):
logger.error(f"海报图像不存在: {poster_image_path}")
return []
# 创建处理器实例并处理
creator = PosterNotesCreator(output_handler)
return creator.create_notes_images(
run_id,
topic_index,
variant_index,
poster_image_path,
poster_metadata_path,
source_image_dir,
num_additional_images,
output_filename_template
)
def select_additional_images(
run_id: str,
topic_index: int,
variant_index: int,
poster_metadata_path: str,
source_image_dir: str,
num_additional_images: int,
output_handler: OutputHandler,
2025-04-26 15:53:44 +08:00
output_filename_template: str = "additional_{index}.jpg",
variation_strength: str = "medium",
extra_effects: bool = True
2025-04-26 14:53:54 +08:00
) -> List[str]:
"""
2025-04-26 15:53:44 +08:00
选择未被海报使用的图像作为额外配图并处理为3:4比例
2025-04-26 14:53:54 +08:00
"""
logger.info(f"开始为主题 {topic_index} 变体 {variant_index} 选择额外配图")
# 验证输入
if not os.path.exists(poster_metadata_path):
logger.error(f"海报元数据不存在: {poster_metadata_path}")
return []
# 创建处理器实例
2025-04-26 14:53:54 +08:00
creator = PosterNotesCreator(output_handler)
# 使用优化后的方法处理图像
2025-04-26 14:53:54 +08:00
return creator.create_additional_images(
run_id,
topic_index,
variant_index,
poster_metadata_path,
source_image_dir,
num_additional_images,
2025-04-26 15:53:44 +08:00
output_filename_template,
variation_strength,
extra_effects
2025-04-26 14:53:54 +08:00
)