TravelContentCreator/utils/poster_notes_creator.py

1710 lines
79 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import random
import logging
import json
from PIL import Image, ImageChops
import traceback
from typing import List, Tuple, Dict, Any, Optional
import concurrent.futures
import numpy as np
from PIL import ImageEnhance, ImageFilter
from .output_handler import OutputHandler
import io
import math
from core.simple_collage import process_directory as process_collage
# 尝试导入 scipy如果失败则标记
try:
from scipy.fftpack import dct, idct
SCIPY_AVAILABLE = True
except ImportError:
SCIPY_AVAILABLE = False
dct = None
idct = None
logger = logging.getLogger(__name__)
class PosterNotesCreator:
"""
处理原始海报作为主图,并随机选择额外的图片作为笔记图片。
确保选择的笔记图片与海报中使用的图片不重复。
"""
def __init__(self, output_handler: OutputHandler):
"""
初始化 PosterNotesCreator
Args:
output_handler: 可选的 OutputHandler 实例,用于处理输出
"""
self.output_handler = output_handler
logging.info("PosterNotesCreator 初始化完成")
def create_notes_images(
self,
run_id: str,
topic_index: int,
variant_index: int,
poster_image_path: str,
poster_metadata_path: str,
source_image_dir: str,
num_additional_images: int,
output_filename_template: str = "note_{index}.jpg"
) -> List[str]:
"""
创建笔记图像
Args:
run_id: 运行ID
topic_index: 主题索引
variant_index: 变体索引
poster_image_path: 海报图像路径
poster_metadata_path: 海报元数据路径
source_image_dir: 源图像目录
num_additional_images: 要使用的额外图像数量
output_filename_template: 输出文件名模板
Returns:
List[str]: 保存的笔记图像路径列表
"""
# 检查输入路径是否存在
if not os.path.exists(poster_image_path):
logger.error(f"海报图像不存在: {poster_image_path}")
return []
if not os.path.exists(poster_metadata_path):
logger.error(f"海报元数据不存在: {poster_metadata_path}")
return []
if not os.path.exists(source_image_dir) or not os.path.isdir(source_image_dir):
logger.error(f"源图像目录不存在: {source_image_dir}")
return []
# 从元数据文件中读取已使用的图像信息
try:
with open(poster_metadata_path, 'r', encoding='utf-8') as f:
poster_metadata = json.load(f)
except Exception as e:
logger.error(f"无法读取海报元数据: {e}")
return []
# 获取已经在海报中使用的图像
used_images = []
if 'collage_images' in poster_metadata:
used_images = poster_metadata['collage_images']
logger.info(f"海报中已使用 {len(used_images)} 张图像: {', '.join(used_images)}")
# 列出源目录中的所有图像文件
image_extensions = ('.jpg', '.jpeg', '.png', '.bmp')
available_images = [
f for f in os.listdir(source_image_dir)
if os.path.isfile(os.path.join(source_image_dir, f)) and
f.lower().endswith(image_extensions)
]
if not available_images:
logger.error(f"源目录中没有找到图像: {source_image_dir}")
return []
logger.info(f"源目录中找到 {len(available_images)} 张图像")
# 过滤掉已经在海报中使用的图像
available_images = [img for img in available_images if img not in used_images]
if not available_images:
logger.warning("所有图像都已在海报中使用,无法创建额外笔记")
return []
logger.info(f"过滤后可用图像数量: {len(available_images)}")
# 如果可用图像少于请求数量,进行警告但继续处理
if len(available_images) < num_additional_images:
logger.warning(
f"可用图像数量 ({len(available_images)}) 少于请求的笔记数量 ({num_additional_images})"
f"将使用所有可用图像"
)
selected_images = available_images
else:
# 随机选择额外图像
selected_images = random.sample(available_images, num_additional_images)
logger.info(f"已选择 {len(selected_images)} 张图像作为笔记")
# 保存选择的笔记图像
saved_paths = []
for i, image_filename in enumerate(selected_images):
try:
# 加载图像
image_path = os.path.join(source_image_dir, image_filename)
image = Image.open(image_path)
# 生成输出文件名
output_filename = output_filename_template.format(index=i+1)
# 创建元数据
note_metadata = {
"original_image": image_filename,
"note_index": i + 1,
"source_dir": source_image_dir,
"associated_poster": os.path.basename(poster_image_path)
}
# 使用输出处理器保存图像
saved_path = self.output_handler.handle_generated_image(
run_id,
topic_index,
variant_index,
'note', # 图像类型为note
image,
output_filename,
note_metadata
)
saved_paths.append(saved_path)
logger.info(f"已保存笔记图像 {i+1}/{len(selected_images)}: {saved_path}")
except Exception as e:
logger.error(f"处理图像时出错 '{image_filename}': {e}")
return saved_paths
def create_additional_images(
self,
run_id: str,
topic_index: int,
variant_index: int,
poster_metadata_path: str,
source_image_dir: str,
num_additional_images: int = 3,
output_filename_template: str = "additional_{index}.jpg",
variation_strength: str = "medium",
extra_effects: bool = True,
collage_style: str = "grid_2x2" # 默认使用grid模式
) -> List[str]:
"""
选择未被海报使用的图像作为额外配图使用2x2网格拼接多张图片
Args:
run_id: 运行ID
topic_index: 主题索引
variant_index: 变体索引
poster_metadata_path: 海报元数据路径
source_image_dir: 源图像目录
num_additional_images: 要输出的额外配图数量默认为3
output_filename_template: 输出文件名模板
variation_strength: 变化强度,可以是 'low', 'medium', 'high'
extra_effects: 是否应用额外效果
collage_style: 拼图风格,固定为 'grid'
Returns:
List[str]: 保存的图像路径列表
"""
logger.info(f"开始为主题 {topic_index} 变体 {variant_index} 选择额外配图")
# 获取候选图像 - 我们需要至少4*num_additional_images张图片
# num_source_images_needed = min(4 * num_additional_images, 12) # 限制最多12张源图 <-- 旧逻辑
total_images_needed = 4 * num_additional_images # 计算总共需要的唯一图片数量
candidate_images = self.get_candidate_images(
poster_metadata_path,
source_image_dir,
# num_source_images_needed <-- 旧逻辑
total_images_needed # 请求总共需要的图片数量
)
if not candidate_images:
logger.warning("没有找到合适的候选图像")
return []
# 检查是否有足够的图片来生成请求数量的配图
if len(candidate_images) < total_images_needed:
adjusted_num_images = len(candidate_images) // 4
logger.warning(
f"可用图像数量 ({len(candidate_images)}) 不足以生成 {num_additional_images} 张不重复的2x2配图 "
f"(需要 {total_images_needed} 张)。将只生成 {adjusted_num_images} 张配图。"
)
num_additional_images = adjusted_num_images
if num_additional_images == 0:
logger.warning("可用图像数量少于4张无法创建任何2x2拼图。")
return []
elif len(candidate_images) < 4: # 即使调整后检查是否仍少于4张
logger.warning(f"可用图像数量({len(candidate_images)})少于4张无法创建2x2拼图")
return []
# 生成唯一的随机种子
seed_str = f"{run_id}_{topic_index}_{variant_index}"
seed = sum(ord(c) for c in seed_str)
random.seed(seed)
logger.info(f"使用随机种子: {seed},基于: {seed_str}")
# 打乱候选图像顺序
random.shuffle(candidate_images)
# 使用多进程并行处理图像
saved_paths = []
with concurrent.futures.ProcessPoolExecutor(max_workers=min(4, num_additional_images)) as executor:
# 创建任务
future_to_image_set = {}
start_index = 0 # 用于追踪从candidate_images中取图的起始位置
for i in range(num_additional_images):
# # 为每个输出选择4张不同的图片 <-- 旧逻辑,改为切片
# selected_indices = []
# # 确保我们有足够的图片可选择
# available_indices = list(range(len(candidate_images)))
# # 如果图片不够,我们可能需要重复使用一些图片
# if len(available_indices) < 4:
# selected_indices = available_indices * (4 // len(available_indices) + 1)
# selected_indices = selected_indices[:4]
# else:
# # 随机选择4个不同的索引
# selected_indices = random.sample(available_indices, 4)
# # 获取对应的图片文件名
# selected_images = [candidate_images[idx] for idx in selected_indices]
# --- 新逻辑:从打乱后的列表中顺序切片获取不重复的图像 ---
end_index = start_index + 4
if end_index > len(candidate_images): # 双重检查,理论上不应发生
logger.error(f"内部错误:尝试获取的图像索引超出范围 ({start_index}-{end_index}),可用图像: {len(candidate_images)}")
break
selected_images = candidate_images[start_index:end_index]
start_index = end_index # 更新下一个起始索引
# --- 结束新逻辑 ---
# 为每个拼图创建单独的种子
image_seed = seed + i
future = executor.submit(
self.process_multiple_images,
run_id,
topic_index,
variant_index,
source_image_dir,
selected_images,
i,
output_filename_template.format(index=i+1),
image_seed,
variation_strength,
extra_effects,
collage_style
)
future_to_image_set[future] = (i, selected_images)
# 收集结果
for future in concurrent.futures.as_completed(future_to_image_set):
i, selected_images = future_to_image_set[future]
try:
saved_path = future.result()
if saved_path:
saved_paths.append(saved_path)
logger.info(f"已保存额外配图 {i+1}/{num_additional_images}: {saved_path}")
except Exception as e:
logger.error(f"处理图像时出错 '{', '.join(selected_images)}': {e}")
logger.error(traceback.format_exc())
# 重置随机种子
random.seed()
return saved_paths
def process_multiple_images(
self,
run_id,
topic_index,
variant_index,
source_dir,
image_filenames,
index,
output_filename,
seed,
variation_strength,
extra_effects,
collage_style="grid_2x2"
):
"""处理多张图像创建2x2网格拼图"""
try:
# 使用core.simple_collage模块处理图像
style = "grid_2x2" # 固定使用grid风格
# 创建临时目录来存放图像以便传递给process_collage函数
import tempfile
import shutil
with tempfile.TemporaryDirectory() as temp_dir:
# 复制选中的图像到临时目录
temp_image_paths = []
for img_filename in image_filenames:
src_path = os.path.join(source_dir, img_filename)
dst_path = os.path.join(temp_dir, img_filename)
shutil.copy2(src_path, dst_path)
temp_image_paths.append(dst_path)
logger.info(f"为网格拼图准备了 {len(temp_image_paths)} 张图像: {', '.join(image_filenames)}")
# 设置随机种子以确保结果一致性
if seed is not None:
random.seed(seed)
np.random.seed(seed)
# 调用core.simple_collage模块处理图像
target_size = (900, 1200) # 3:4比例
collage_images, used_image_filenames = process_collage(
temp_dir,
style=style,
target_size=target_size,
output_count=1,
)
# 重置随机种子
if seed is not None:
random.seed()
np.random.seed()
if not collage_images or len(collage_images) == 0:
logger.error(f"拼图模块没有生成有效的图像")
return None
processed_image = collage_images[0]
# 确保图像是RGB模式解决"cannot write mode RGBA as JPEG"错误
if processed_image.mode == 'RGBA':
logger.debug(f"将RGBA图像转换为RGB模式")
# 创建白色背景并粘贴RGBA图像
background = Image.new('RGB', processed_image.size, (255, 255, 255))
background.paste(processed_image, mask=processed_image.split()[3]) # 使用alpha通道作为mask
processed_image = background
elif processed_image.mode != 'RGB':
logger.debug(f"{processed_image.mode}图像转换为RGB模式")
processed_image = processed_image.convert('RGB')
# 创建元数据
additional_metadata = {
"original_images": image_filenames,
"additional_index": index + 1,
"source_dir": source_dir,
"is_additional_image": True,
"processed": True,
"aspect_ratio": "3:4",
"collage_style": style,
"grid_size": "2x2"
}
# 使用输出处理器保存图像
return self.output_handler.handle_generated_image(
run_id,
topic_index,
variant_index,
'additional', # 图像类型为additional
processed_image,
output_filename,
additional_metadata
)
except Exception as e:
logger.error(f"处理多张图像时出错: {e}")
logger.error(traceback.format_exc())
return None
def get_candidate_images(self, poster_metadata_path, source_image_dir, num_images):
"""获取候选图像列表,排除已用于海报的图像"""
# 检查输入路径是否存在
if not os.path.exists(poster_metadata_path):
logger.error(f"海报元数据不存在: {poster_metadata_path}")
return []
if not os.path.exists(source_image_dir) or not os.path.isdir(source_image_dir):
logger.error(f"源图像目录不存在: {source_image_dir}")
return []
# 从元数据文件中读取已使用的图像信息
try:
with open(poster_metadata_path, 'r', encoding='utf-8') as f:
poster_metadata = json.load(f)
except Exception as e:
logger.error(f"无法读取海报元数据: {e}")
return []
# 获取已经在海报中使用的图像
used_images = []
if 'collage_images' in poster_metadata:
used_images = poster_metadata['collage_images']
logger.info(f"海报中已使用 {len(used_images)} 张图像: {', '.join(used_images)}")
# 列出源目录中的所有图像文件
image_extensions = ('.jpg', '.jpeg', '.png', '.bmp')
available_images = [
f for f in os.listdir(source_image_dir)
if os.path.isfile(os.path.join(source_image_dir, f)) and
f.lower().endswith(image_extensions)
]
if not available_images:
logger.error(f"源目录中没有找到图像: {source_image_dir}")
return []
logger.info(f"源目录中找到 {len(available_images)} 张图像")
# 过滤掉已经在海报中使用的图像
available_images = [img for img in available_images if img not in used_images]
if not available_images:
logger.warning("所有图像都已在海报中使用,无法创建额外配图")
return []
logger.info(f"过滤后可用图像数量: {len(available_images)}")
# 如果可用图像少于请求数量,进行警告但继续处理
if len(available_images) < num_images:
logger.warning(
f"可用图像数量 ({len(available_images)}) 少于请求的配图数量 ({num_images})"
f"将使用所有可用图像"
)
selected_images = available_images
else:
# 随机选择额外图像
random.seed(sum(map(ord, ''.join(available_images)))) # 确保结果一致性
selected_images = random.sample(available_images, num_images)
random.seed() # 重置随机种子
return selected_images
def process_single_image(
self,
run_id,
topic_index,
variant_index,
image_path,
image_filename,
index,
source_dir,
output_filename,
seed,
variation_strength,
extra_effects,
collage_style="grid_2x2"
):
"""处理单张图像 - 此方法可在独立进程中运行"""
try:
# 使用core.simple_collage模块处理图像
style = collage_style if collage_style else "slice"
# 创建临时目录来存放图像以便传递给process_collage函数
import tempfile
import shutil
with tempfile.TemporaryDirectory() as temp_dir:
# 复制图像到临时目录
temp_image_path = os.path.join(temp_dir, image_filename)
shutil.copy2(image_path, temp_image_path)
# 设置随机种子以确保结果一致性
if seed is not None:
random.seed(seed)
np.random.seed(seed)
# 调用core.simple_collage模块处理图像
target_size = (900, 1200) # 3:4比例
collage_images, used_image_filenames = process_collage(
temp_dir,
style=style,
target_size=target_size,
output_count=1
)
# 重置随机种子
if seed is not None:
random.seed()
np.random.seed()
if not collage_images or len(collage_images) == 0:
logger.error(f"拼图模块没有生成有效的图像: {image_filename}")
return None
processed_image = collage_images[0]
# 确保图像是RGB模式解决"cannot write mode RGBA as JPEG"错误
if processed_image.mode == 'RGBA':
logger.debug(f"将RGBA图像转换为RGB模式: {image_filename}")
# 创建白色背景并粘贴RGBA图像
background = Image.new('RGB', processed_image.size, (255, 255, 255))
background.paste(processed_image, mask=processed_image.split()[3]) # 使用alpha通道作为mask
processed_image = background
elif processed_image.mode != 'RGB':
logger.debug(f"{processed_image.mode}图像转换为RGB模式: {image_filename}")
processed_image = processed_image.convert('RGB')
# 创建元数据
additional_metadata = {
"original_image": image_filename,
"additional_index": index + 1,
"source_dir": source_dir,
"is_additional_image": True,
"processed": True,
"aspect_ratio": "3:4",
"collage_style": style
}
# 使用输出处理器保存图像
return self.output_handler.handle_generated_image(
run_id,
topic_index,
variant_index,
'additional', # 图像类型为additional
processed_image,
output_filename,
additional_metadata
)
except Exception as e:
logger.error(f"处理图像时出错 '{image_filename}': {e}")
logger.error(traceback.format_exc())
return None
def add_dct_noise(self, image: Image.Image, intensity: float = 0.1, block_size: int = 8) -> Image.Image:
"""
在DCT域添加噪声以对抗pHash (需要Scipy) - 强化版
Args:
image: 输入图像 (建议传入灰度图或处理亮度通道)
intensity: 噪声强度 (0-1)
block_size: DCT块大小 (通常为8)
Returns:
添加噪声后的图像
"""
if not SCIPY_AVAILABLE:
logger.warning("Scipy 未安装无法执行DCT噪声注入。请运行 'pip install scipy'")
# 可以选择返回原图,或执行一个简化的备用方案
# 这里我们返回原图
return image
try:
logger.debug(f"应用强化DCT噪声强度: {intensity:.3f}")
# 确保是灰度图或提取亮度通道 (这里以灰度为例)
if image.mode != 'L':
gray_image = image.convert('L')
else:
gray_image = image
img_array = np.array(gray_image, dtype=float)
h, w = img_array.shape
# 确保尺寸是块大小的倍数
h_pad = (block_size - h % block_size) % block_size
w_pad = (block_size - w % block_size) % block_size
if h_pad != 0 or w_pad != 0:
img_array = np.pad(img_array, ((0, h_pad), (0, w_pad)), mode='reflect')
padded_h, padded_w = img_array.shape
else:
padded_h, padded_w = h, w
# 定义目标系数范围 (例如排除DC的左上角4x4低频区域)
target_h, target_w = 4, 4
for y in range(0, padded_h, block_size):
for x in range(0, padded_w, block_size):
block = img_array[y:y+block_size, x:x+block_size]
dct_block = dct(dct(block.T, norm='ortho').T, norm='ortho')
# --- 强化噪声逻辑 ---
# 1. 计算噪声幅度,不再完全依赖系数本身大小
noise_amplitude = intensity * 30 # 固定基础噪声幅度 (可调)
# 2. 生成噪声
noise = np.random.uniform(-noise_amplitude, noise_amplitude,
(min(block_size, target_h), min(block_size, target_w)))
# 3. 应用噪声到目标低频区域 (跳过DC)
noise_h, noise_w = noise.shape
# 确保索引不超过dct_block的实际大小
apply_h, apply_w = min(noise_h, dct_block.shape[0]), min(noise_w, dct_block.shape[1])
# 尝试乘性噪声 - 可能对保留结构更好一点
factor = np.random.uniform(1.0 - intensity * 0.8, 1.0 + intensity * 0.8,
(min(block_size, target_h), min(block_size, target_w)))
dct_block[0:apply_h, 0:apply_w] *= factor[0:apply_h, 0:apply_w]
dct_block[0, 0] /= factor[0, 0] # 恢复DC系数近似值
# --- 结束强化噪声逻辑 ---
idct_block = idct(idct(dct_block.T, norm='ortho').T, norm='ortho')
img_array[y:y+block_size, x:x+block_size] = idct_block
if h_pad != 0 or w_pad != 0:
img_array = img_array[:h, :w]
img_array = np.clip(img_array, 0, 255)
modified_gray = Image.fromarray(img_array.astype(np.uint8))
if image.mode == 'RGB' and gray_image is not image:
blend_factor = 0.35 # 稍微增加混合强度
r, g, b = image.split()
r = Image.blend(r, modified_gray, blend_factor)
g = Image.blend(g, modified_gray, blend_factor)
b = Image.blend(b, modified_gray, blend_factor)
merged_image = Image.merge('RGB', (r, g, b))
else:
merged_image = modified_gray
logger.debug("强化DCT噪声应用成功。")
return merged_image
except Exception as e:
logger.error(f"强化DCT噪声注入出错: {e}")
return image
def add_phash_noise(self, image: Image.Image, intensity: float = 0.05) -> Image.Image:
"""调用强化的 add_dct_noise 方法"""
logger.debug(f"调用强化add_dct_noise对抗pHash强度: {intensity:.3f}")
return self.add_dct_noise(image, intensity=intensity)
def apply_smart_crop_resize(self, image: Image.Image, strength: str = "medium") -> Image.Image:
"""
应用智能裁剪和重缩放来抵抗哈希算法 (增强版)
Args:
image: 输入图像
strength: 处理强度 ('low', 'medium', 'high')
Returns:
处理后的图像
"""
try:
original_width, original_height = image.size
logger.debug(f"应用智能裁剪+重缩放 (强度: {strength}), 原始尺寸: {original_width}x{original_height}")
# 根据强度决定裁剪量 (增强)
if strength == "low":
max_crop = 3 # 原为 1
elif strength == "high":
max_crop = 10 # 原为 3
else: # medium
max_crop = 6 # 原为 2
logger.debug(f"增强型智能裁剪: max_crop = {max_crop} 像素")
# 随机决定每边的裁剪量
crop_left = random.randint(0, max_crop)
crop_top = random.randint(0, max_crop)
crop_right = random.randint(0, max_crop)
crop_bottom = random.randint(0, max_crop)
# 计算裁剪后的边界
left = crop_left
top = crop_top
right = original_width - crop_right
bottom = original_height - crop_bottom
# 确保裁剪后尺寸至少为1x1
if left >= right or top >= bottom:
logger.warning("智能裁剪计算无效(裁剪过多),尝试使用较小裁剪量。")
# 尝试减小裁剪量再次计算
safe_max_crop = min(original_width // 4, original_height // 4, max_crop) # 保证不裁掉整个图
crop_left = random.randint(0, safe_max_crop)
crop_top = random.randint(0, safe_max_crop)
crop_right = random.randint(0, safe_max_crop)
crop_bottom = random.randint(0, safe_max_crop)
left = crop_left
top = crop_top
right = original_width - crop_right
bottom = original_height - crop_bottom
if left >= right or top >= bottom: # 再次失败则跳过
logger.error("智能裁剪再次失败,跳过此步骤。")
return image
logger.debug(f" 裁剪参数: L={crop_left}, T={crop_top}, R={crop_right}, B={crop_bottom}")
logger.debug(f" 裁剪区域: ({left}, {top}, {right}, {bottom})")
# 执行裁剪
cropped_image = image.crop((left, top, right, bottom))
# 使用高质量插值将图像缩放回原始尺寸
logger.debug(f" 将裁剪后图像 ({cropped_image.width}x{cropped_image.height}) 缩放回 ({original_width}x{original_height})")
resampling_filter = Image.LANCZOS # 高质量插值
resized_image = cropped_image.resize((original_width, original_height), resample=resampling_filter)
logger.debug("智能裁剪+重缩放应用成功。")
return resized_image
except Exception as e:
logger.error(f"智能裁剪+重缩放时出错: {e}")
return image # 出错时返回原图
def perturb_color_histogram(self, image: Image.Image, strength: float = 0.03) -> Image.Image:
"""
扰动图像的颜色直方图,对抗基于颜色统计的图像匹配
Args:
image: 输入图像
strength: 扰动强度(0-1)
Returns:
处理后的图像
"""
logger.debug(f"扰动颜色直方图,强度: {strength:.3f}")
# 确保为RGB模式
if image.mode != 'RGB':
image = image.convert('RGB')
# 转为numpy数组
img_array = np.array(image)
height, width, channels = img_array.shape
# 对每个通道分别处理
for channel in range(channels):
# 计算当前通道的直方图
hist, _ = np.histogram(img_array[:,:,channel].flatten(), bins=64, range=(0, 256))
# 找出主要颜色区间 (频率高的区间)
threshold = np.percentile(hist, 70) # 取前30%的颜色块
significant_bins = np.where(hist > threshold)[0]
if len(significant_bins) > 0:
for bin_idx in significant_bins:
# 计算当前bin对应的颜色范围
bin_width = 256 // 64
color_low = bin_idx * bin_width
color_high = (bin_idx + 1) * bin_width
# 创建颜色范围掩码
mask = (img_array[:,:,channel] >= color_low) & (img_array[:,:,channel] < color_high)
if np.any(mask):
# 生成随机偏移值
offset = int(strength * bin_width * (random.random() - 0.5) * 2)
# 应用偏移确保在0-255范围内
img_array[:,:,channel][mask] = np.clip(
img_array[:,:,channel][mask] + offset, 0, 255).astype(np.uint8)
# 转回PIL图像
logger.debug("颜色直方图扰动成功。")
return Image.fromarray(img_array)
def strip_metadata(self, image: Image.Image) -> Image.Image:
"""
移除图像中的所有元数据 (修复版)
Args:
image: 输入图像
Returns:
无元数据的图像
"""
logger.debug("移除图像元数据...")
try:
# 确保图像处于适合保存的模式例如RGB
if image.mode == 'RGBA':
# 创建一个白色背景然后粘贴带有alpha的图像
background = Image.new("RGB", image.size, (255, 255, 255))
background.paste(image, mask=image.split()[3]) # 3 is the alpha channel
image_to_save = background
elif image.mode == 'P':
# 带调色板的图像转换为RGB
image_to_save = image.convert('RGB')
elif image.mode == 'L':
# 灰度图通常可以保存为JPEG或PNG
image_to_save = image
elif image.mode == 'RGB':
image_to_save = image # 已经是RGB直接使用
else:
logger.warning(f"未知的图像模式 {image.mode}尝试转换为RGB进行元数据剥离。")
image_to_save = image.convert('RGB')
# 保存到内存缓冲区强制使用JPEG格式以剥离元数据
data = io.BytesIO()
# --- FIX: 强制使用JPEG格式保存到缓冲区 ---
save_format = 'JPEG'
logger.debug(f"强制使用 {save_format} 格式保存以剥离元数据")
image_to_save.save(data, format=save_format, quality=95) # 使用高质量JPEG
# --- END FIX ---
data.seek(0) # 重置缓冲区指针
reloaded_image = Image.open(data)
logger.debug("元数据移除成功。")
return reloaded_image
except Exception as e:
logger.error(f"移除元数据时出错: {e}")
logger.error(traceback.format_exc()) # 打印详细错误
return image # 出错时返回原图
def apply_overlay_noise(self, image: Image.Image, alpha: int = 10, noise_type: str = 'uniform') -> Image.Image:
"""
在图像上叠加一个低透明度的噪声图层
Args:
image: 输入图像
alpha: 叠加噪声图层的 Alpha 值 (0-255)
noise_type: 'gaussian''uniform'
Returns:
叠加噪声后的图像
"""
try:
logger.debug(f"应用低透明度噪声叠加: alpha={alpha}, type={noise_type}")
# 确保图像是 RGBA 模式以处理透明度
if image.mode != 'RGBA':
base_image = image.convert('RGBA')
else:
base_image = image.copy() # 操作副本
width, height = base_image.size
# 创建噪声图层 (灰度噪声即可)
if noise_type == 'gaussian':
# 生成范围在 0-255 的高斯噪声均值128
noise_array = np.random.normal(loc=128, scale=40, size=(height, width)).clip(0, 255).astype(np.uint8)
else: # uniform
noise_array = np.random.randint(0, 256, size=(height, width), dtype=np.uint8)
noise_image = Image.fromarray(noise_array, mode='L')
# 将噪声灰度图转换为 RGBA并设置 alpha 通道
noise_rgba = noise_image.convert('RGBA')
# 创建一个全为指定 alpha 值的通道
alpha_channel = Image.new('L', noise_image.size, alpha)
noise_rgba.putalpha(alpha_channel)
# 使用 alpha_composite 进行混合叠加
# alpha_composite 要求两个输入都是 RGBA
combined_image = Image.alpha_composite(base_image, noise_rgba)
# 通常我们希望最终结果是 RGB所以转换回去
# 如果原图就是 RGBA 且需要保留透明度,则省略此步
final_image = combined_image.convert('RGB')
logger.debug("低透明度噪声叠加应用成功。")
return final_image
except Exception as e:
logger.error(f"应用叠加噪声时出错: {e}")
logger.error(traceback.format_exc()) # 打印详细错误
return image # 出错时返回原图
def apply_ahash_specific_disruption(self, image: Image.Image, strength: str = "medium") -> Image.Image:
"""
专门针对aHash的干扰方法插入亮度带 (增强版)
Args:
image: 输入图像
strength: 处理强度 ('low', 'medium', 'high')
Returns:
处理后的图像
"""
try:
# 设定强度相关参数 (增强)
if strength == "low":
intensity = 0.08 # 原为 0.02
bands = 2
elif strength == "high":
intensity = 0.18 # 原为 0.04
bands = 4
else: # medium
intensity = 0.12 # 原为 0.03
bands = 3
logger.debug(f"应用aHash特定干扰 (亮度带) (增强版), 强度:{strength}, 条带数:{bands}, 强度因子:{intensity:.3f}")
# ... (其余逻辑不变) ...
result = image.copy()
width, height = result.size
pixels = result.load()
is_horizontal = random.choice([True, False])
band_positions = []
if is_horizontal:
for _ in range(bands):
base_pos = random.randint(0, height - 1)
band_positions.append(base_pos)
else:
for _ in range(bands):
base_pos = random.randint(0, width - 1)
band_positions.append(base_pos)
for y_idx in range(height): # Renamed y to y_idx to avoid conflict
for x_idx in range(width): # Renamed x to x_idx to avoid conflict
is_on_band = False
if is_horizontal:
for pos in band_positions:
if abs(y_idx - pos) <= 1:
is_on_band = True
break
else:
for pos in band_positions:
if abs(x_idx - pos) <= 1:
is_on_band = True
break
if is_on_band:
pixel = pixels[x_idx, y_idx]
if isinstance(pixel, int):
r_val = g_val = b_val = pixel # Renamed r,g,b to r_val, g_val, b_val
is_rgb = False
else:
if len(pixel) >= 3:
r_val, g_val, b_val = pixel[0], pixel[1], pixel[2]
is_rgb = True
else:
continue
factor = 1.0 + intensity * (1 if random.random() > 0.5 else -1)
r_val, g_val, b_val = int(r_val * factor), int(g_val * factor), int(b_val * factor)
r_val, g_val, b_val = max(0, min(255, r_val)), max(0, min(255, g_val)), max(0, min(255, b_val))
if is_rgb:
if len(pixel) == 4:
pixels[x_idx, y_idx] = (r_val, g_val, b_val, pixel[3])
else:
pixels[x_idx, y_idx] = (r_val, g_val, b_val)
else:
pixels[x_idx, y_idx] = r_val
logger.debug(f"aHash特定干扰完成: {'水平' if is_horizontal else '垂直'}亮度带")
return result
except Exception as e:
logger.error(f"应用aHash特定干扰时出错: {e}")
logger.error(traceback.format_exc())
return image
def apply_dhash_specific_disruption(self, image: Image.Image, strength: str = "medium") -> Image.Image:
"""
专门针对dHash的干扰方法梯度反向模式 (增强版)
Args:
image: 输入图像
strength: 处理强度 ('low', 'medium', 'high')
Returns:
处理后的图像
"""
try:
# 设定强度相关参数 (增强)
if strength == "low":
gradient_strength = 0.08 # 原为 0.02
regions = 2
elif strength == "high":
gradient_strength = 0.18 # 原为 0.04
regions = 4
else: # medium
gradient_strength = 0.12 # 原为 0.03
regions = 3
logger.debug(f"应用dHash特定干扰 (梯度反向) (增强版), 强度:{strength}, 区域数:{regions}, 梯度强度:{gradient_strength:.3f}")
# ... (其余逻辑不变, 确保使用增强的 gradient_strength) ...
result = image.copy()
width, height = result.size
for _ in range(regions):
region_w = random.randint(width//12, width//8) # Renamed region_width to region_w
region_h = random.randint(height//12, height//8) # Renamed region_height to region_h
region_x_coord = random.randint(0, width - region_w) # Renamed region_x to region_x_coord
region_y_coord = random.randint(0, height - region_h) # Renamed region_y to region_y_coord
region = result.crop((region_x_coord, region_y_coord, region_x_coord + region_w, region_y_coord + region_h))
region_array = np.array(region)
is_rgb = len(region_array.shape) == 3
if is_rgb:
gray_region = np.mean(region_array, axis=2).astype(np.uint8)
else:
gray_region = region_array
h_gradients = np.zeros_like(gray_region, dtype=np.int16)
v_gradients = np.zeros_like(gray_region, dtype=np.int16)
for y_idx in range(region_h): # Renamed y to y_idx
for x_idx in range(region_w-1): # Renamed x to x_idx
h_gradients[y_idx, x_idx] = int(gray_region[y_idx, x_idx+1]) - int(gray_region[y_idx, x_idx])
for y_idx in range(region_h-1): # Renamed y to y_idx
for x_idx in range(region_w): # Renamed x to x_idx
v_gradients[y_idx, x_idx] = int(gray_region[y_idx+1, x_idx]) - int(gray_region[y_idx, x_idx])
modified_region = region_array.astype(np.float32)
for y_idx in range(region_h): # Renamed y to y_idx
for x_idx in range(region_w): # Renamed x to x_idx
if x_idx < region_w-1 and abs(h_gradients[y_idx, x_idx]) > 5:
h_change = -h_gradients[y_idx, x_idx] * gradient_strength
if is_rgb:
for c_channel in range(3): # Renamed c to c_channel
modified_region[y_idx, x_idx+1, c_channel] = np.clip(
modified_region[y_idx, x_idx+1, c_channel] + h_change/2, 0, 255)
modified_region[y_idx, x_idx, c_channel] = np.clip(
modified_region[y_idx, x_idx, c_channel] - h_change/2, 0, 255)
else:
modified_region[y_idx, x_idx+1] = np.clip(
modified_region[y_idx, x_idx+1] + h_change/2, 0, 255)
modified_region[y_idx, x_idx] = np.clip(
modified_region[y_idx, x_idx] - h_change/2, 0, 255)
if y_idx < region_h-1 and abs(v_gradients[y_idx, x_idx]) > 5:
v_change = -v_gradients[y_idx, x_idx] * gradient_strength
if is_rgb:
for c_channel in range(3): # Renamed c to c_channel
modified_region[y_idx+1, x_idx, c_channel] = np.clip(
modified_region[y_idx+1, x_idx, c_channel] + v_change/2, 0, 255)
modified_region[y_idx, x_idx, c_channel] = np.clip(
modified_region[y_idx, x_idx, c_channel] - v_change/2, 0, 255)
else:
modified_region[y_idx+1, x_idx] = np.clip(
modified_region[y_idx+1, x_idx] + v_change/2, 0, 255)
modified_region[y_idx, x_idx] = np.clip(
modified_region[y_idx, x_idx] - v_change/2, 0, 255)
modified_region = modified_region.astype(np.uint8)
modified_region_image = Image.fromarray(modified_region)
result.paste(modified_region_image, (region_x_coord, region_y_coord))
logger.debug(f"dHash特定干扰完成: 在{regions}个区域应用梯度反向")
return result
except Exception as e:
logger.error(f"应用dHash特定干扰时出错: {e}")
logger.error(traceback.format_exc())
return image
def apply_phash_specific_disruption(self, image: Image.Image, strength: str = "medium") -> Image.Image:
"""
专门针对pHash的干扰方法定向DCT系数修改 (增强版)
Args:
image: 输入图像
strength: 处理强度 ('low', 'medium', 'high')
Returns:
处理后的图像
"""
if not SCIPY_AVAILABLE:
logger.warning("Scipy 未安装无法执行pHash专用干扰。请运行 'pip install scipy'")
return image
try:
# 设定强度相关参数 (增强)
if strength == "low":
intensity = 0.20 # 原为 0.10
key_positions_count = 4
elif strength == "high":
intensity = 0.40 # 原为 0.20
key_positions_count = 8
else: # medium
intensity = 0.30 # 原为 0.15
key_positions_count = 6
logger.debug(f"应用pHash特定干扰 (定向DCT干扰) (增强版), 强度:{strength}, 密度:{key_positions_count}, 强度因子:{intensity:.2f}")
# ... (其余逻辑不变, 确保使用增强的 intensity) ...
gray_image = image.convert('L')
img_array_np = np.array(gray_image) # Renamed img_array to img_array_np
h_img, w_img = img_array_np.shape # Renamed h,w to h_img,w_img
resized_array = np.array(gray_image.resize((32, 32), Image.LANCZOS))
dct_array = dct(dct(resized_array.T, norm='ortho').T, norm='ortho')
key_positions = []
for i_pos in range(1, 8): # Renamed i to i_pos
for j_pos in range(1, 8): # Renamed j to j_pos
key_positions.append((i_pos, j_pos))
selected_positions = random.sample(key_positions, k=min(len(key_positions), key_positions_count))
block_h_size, block_w_size = h_img // 32, w_img // 32 # Renamed block_height, block_width
for dct_y_coord, dct_x_coord in selected_positions: # Renamed dct_y, dct_x
orig_y_coord = dct_y_coord * block_h_size # Renamed orig_y to orig_y_coord
orig_x_coord = dct_x_coord * block_w_size # Renamed orig_x to orig_x_coord
pattern_s = min(block_h_size, block_w_size) # Renamed pattern_size to pattern_s
for y_off in range(pattern_s): # Renamed y_offset to y_off
for x_off in range(pattern_s): # Renamed x_offset to x_off
y_val = orig_y_coord + y_off # Renamed y to y_val
x_val = orig_x_coord + x_off # Renamed x to x_val
if 0 <= y_val < h_img and 0 <= x_val < w_img:
offset_val = intensity * 20 * math.sin(2 * math.pi * (y_off / pattern_s)) * \
math.cos(2 * math.pi * (x_off / pattern_s)) # Renamed offset to offset_val
img_array_np[y_val, x_val] = np.clip(img_array_np[y_val, x_val] + offset_val, 0, 255)
result_img = Image.fromarray(img_array_np.astype(np.uint8)) # Renamed result to result_img
if image.mode != 'L':
r_channel, g_channel, b_channel = image.split()[:3] # Renamed r,g,b to r_channel, g_channel, b_channel
diff_img = ImageChops.difference(gray_image, result_img) # Renamed diff to diff_img
diff_array_np = np.array(diff_img) # Renamed diff_array to diff_array_np
r_array_np = np.array(r_channel) # Renamed r_array to r_array_np
g_array_np = np.array(g_channel) # Renamed g_array to g_array_np
b_array_np = np.array(b_channel) # Renamed b_array to b_array_np
transfer_factor = 0.8
r_array_np = np.clip(r_array_np + diff_array_np * transfer_factor, 0, 255).astype(np.uint8)
g_array_np = np.clip(g_array_np + diff_array_np * transfer_factor, 0, 255).astype(np.uint8)
b_array_np = np.clip(b_array_np + diff_array_np * transfer_factor, 0, 255).astype(np.uint8)
r_new_img = Image.fromarray(r_array_np) # Renamed r_new to r_new_img
g_new_img = Image.fromarray(g_array_np) # Renamed g_new to g_new_img
b_new_img = Image.fromarray(b_array_np) # Renamed b_new to b_new_img
if image.mode == 'RGBA':
alpha_channel = image.split()[3] # Renamed a to alpha_channel
result_img = Image.merge('RGBA', (r_new_img, g_new_img, b_new_img, alpha_channel))
else:
result_img = Image.merge('RGB', (r_new_img, g_new_img, b_new_img))
logger.debug(f"pHash特定干扰完成: 修改了{len(selected_positions)}个DCT关键位置")
return result_img
except Exception as e:
logger.error(f"应用pHash特定干扰时出错: {e}")
logger.error(traceback.format_exc())
return image
def apply_block_based_perturbations(self, image: Image.Image, block_size: int = 16, strength: str = "medium") -> Image.Image:
"""
对图像各个块应用不同的、独立的干扰策略 (增强版)
Args:
image: 输入图像
block_size: 块大小
strength: 处理强度 ('low', 'medium', 'high')
Returns:
处理后的图像
"""
try:
# 设定强度相关参数 (增强)
if strength == "low":
factor_range = 0.08 # 原为 0.03
skip_prob = 0.5
elif strength == "high":
factor_range = 0.18 # 原为 0.06
skip_prob = 0.2
else: # medium
factor_range = 0.12 # 原为 0.045
skip_prob = 0.35
logger.debug(f"应用块级混合干扰 (增强版), 块大小:{block_size}, 强度:{strength}, 因子范围:{factor_range:.3f}")
# ... (其余逻辑不变, 确保使用增强的 factor_range) ...
result = image.copy() # Renamed result_img to result
width, height = image.size
img_array = np.array(result)
is_rgb = len(img_array.shape) == 3
strategies = ['brightness', 'contrast', 'hue_shift', 'gradient_flip', 'micro_pattern', 'skip']
processed_blocks = 0
skipped_blocks = 0
for y_coord in range(0, height, block_size): # Renamed y to y_coord
for x_coord in range(0, width, block_size): # Renamed x to x_coord
block_w = min(block_size, width - x_coord)
block_h = min(block_size, height - y_coord)
if block_w < 4 or block_h < 4:
continue
current_strategy = 'skip' if random.random() < skip_prob else random.choice([s for s in strategies if s != 'skip']) # Renamed strategy to current_strategy
if is_rgb:
current_block = img_array[y_coord:y_coord+block_h, x_coord:x_coord+block_w, :] # Renamed block to current_block
else:
current_block = img_array[y_coord:y_coord+block_h, x_coord:x_coord+block_w]
if current_strategy == 'skip':
skipped_blocks +=1
elif current_strategy == 'brightness':
factor = 1.0 + random.uniform(-factor_range, factor_range)
current_block = (current_block.astype(float) * factor).clip(0, 255).astype(np.uint8)
processed_blocks += 1
elif current_strategy == 'contrast':
factor = 1.0 + random.uniform(-factor_range, factor_range)
if is_rgb:
mean_val = np.mean(current_block, axis=(0, 1), keepdims=True)
current_block = (((current_block.astype(float) - mean_val) * factor) + mean_val).clip(0, 255).astype(np.uint8)
else:
mean_val = np.mean(current_block)
current_block = (((current_block.astype(float) - mean_val) * factor) + mean_val).clip(0, 255).astype(np.uint8)
processed_blocks += 1
elif current_strategy == 'hue_shift' and is_rgb:
r_factor = 1.0 - random.uniform(0, factor_range/2)
g_factor = 1.0 - random.uniform(0, factor_range/2)
b_factor = 1.0 - random.uniform(0, factor_range/2)
r_ch, g_ch, b_ch = current_block[:,:,0], current_block[:,:,1], current_block[:,:,2] # Renamed r,g,b to r_ch,g_ch,b_ch
current_block[:,:,0] = (r_ch * r_factor + g_ch * (1-r_factor)).clip(0, 255).astype(np.uint8)
current_block[:,:,1] = (g_ch * g_factor + b_ch * (1-g_factor)).clip(0, 255).astype(np.uint8)
current_block[:,:,2] = (b_ch * b_factor + r_ch * (1-b_factor)).clip(0, 255).astype(np.uint8)
processed_blocks += 1
elif current_strategy == 'gradient_flip':
if block_w > 2 and block_h > 2:
mid_w, mid_h = block_w // 2, block_h // 2
pattern_s = min(mid_w, mid_h) # Renamed pattern_size to pattern_s
for by_idx in range(1, pattern_s-1): # Renamed by to by_idx
for bx_idx in range(1, pattern_s-1): # Renamed bx to bx_idx
if is_rgb:
curr_val = np.mean(current_block[by_idx, bx_idx, :]) # Renamed curr to curr_val
right_val = np.mean(current_block[by_idx, bx_idx+1, :]) # Renamed right to right_val
below_val = np.mean(current_block[by_idx+1, bx_idx, :]) # Renamed below to below_val
if abs(curr_val - right_val) > 5:
diff_val = (curr_val - right_val) * factor_range # Renamed diff to diff_val
current_block[by_idx, bx_idx, :] = np.clip(current_block[by_idx, bx_idx, :] - diff_val/2, 0, 255).astype(np.uint8)
current_block[by_idx, bx_idx+1, :] = np.clip(current_block[by_idx, bx_idx+1, :] + diff_val/2, 0, 255).astype(np.uint8)
if abs(curr_val - below_val) > 5:
diff_val = (curr_val - below_val) * factor_range
current_block[by_idx, bx_idx, :] = np.clip(current_block[by_idx, bx_idx, :] - diff_val/2, 0, 255).astype(np.uint8)
current_block[by_idx+1, bx_idx, :] = np.clip(current_block[by_idx+1, bx_idx, :] + diff_val/2, 0, 255).astype(np.uint8)
else:
curr_val = float(current_block[by_idx, bx_idx])
right_val = float(current_block[by_idx, bx_idx+1])
below_val = float(current_block[by_idx+1, bx_idx])
if abs(curr_val - right_val) > 5:
diff_val = (curr_val - right_val) * factor_range
current_block[by_idx, bx_idx] = np.clip(current_block[by_idx, bx_idx] - diff_val/2, 0, 255).astype(np.uint8)
current_block[by_idx, bx_idx+1] = np.clip(current_block[by_idx, bx_idx+1] + diff_val/2, 0, 255).astype(np.uint8)
if abs(curr_val - below_val) > 5:
diff_val = (curr_val - below_val) * factor_range
current_block[by_idx, bx_idx] = np.clip(current_block[by_idx, bx_idx] - diff_val/2, 0, 255).astype(np.uint8)
current_block[by_idx+1, bx_idx] = np.clip(current_block[by_idx+1, bx_idx] + diff_val/2, 0, 255).astype(np.uint8)
processed_blocks += 1
elif current_strategy == 'micro_pattern':
pattern_type = random.choice(['dot', 'line', 'cross'])
center_y_coord, center_x_coord = block_h // 2, block_w // 2 # Renamed center_y, center_x
pattern_coords = []
if pattern_type == 'dot':
pattern_coords = [(center_y_coord, center_x_coord)]
elif pattern_type == 'line':
if random.choice([True, False]):
pattern_coords = [(center_y_coord, cx_val) for cx_val in range(center_x_coord-1, center_x_coord+2)] # Renamed cx to cx_val
else:
pattern_coords = [(cy_val, center_x_coord) for cy_val in range(center_y_coord-1, center_y_coord+2)] # Renamed cy to cy_val
else:
pattern_coords.extend([(center_y_coord, cx_val) for cx_val in range(center_x_coord-1, center_x_coord+2)])
pattern_coords.extend([(cy_val, center_x_coord) for cy_val in range(center_y_coord-1, center_y_coord+2) if (cy_val, center_x_coord) not in pattern_coords])
pattern_strength = random.uniform(factor_range*50, factor_range*100)
for py_coord, px_coord in pattern_coords: # Renamed py,px to py_coord,px_coord
if 0 <= py_coord < block_h and 0 <= px_coord < block_w:
if is_rgb:
target_channel = random.randint(0, 2) # Renamed channel to target_channel
if random.choice([True, False]):
current_block[py_coord, px_coord, target_channel] = np.clip(current_block[py_coord, px_coord, target_channel] + pattern_strength, 0, 255).astype(np.uint8)
else:
current_block[py_coord, px_coord, target_channel] = np.clip(current_block[py_coord, px_coord, target_channel] - pattern_strength, 0, 255).astype(np.uint8)
else:
if random.choice([True, False]):
current_block[py_coord, px_coord] = np.clip(current_block[py_coord, px_coord] + pattern_strength, 0, 255).astype(np.uint8)
else:
current_block[py_coord, px_coord] = np.clip(current_block[py_coord, px_coord] - pattern_strength, 0, 255).astype(np.uint8)
processed_blocks += 1
if is_rgb:
img_array[y_coord:y_coord+block_h, x_coord:x_coord+block_w, :] = current_block
else:
img_array[y_coord:y_coord+block_h, x_coord:x_coord+block_w] = current_block
result = Image.fromarray(img_array) # Result was already defined
logger.debug(f"块级混合干扰完成: 处理了{processed_blocks}个块, 跳过了{skipped_blocks}个块")
return result
except Exception as e:
logger.error(f"应用块级混合干扰时出错: {e}")
logger.error(traceback.format_exc())
return image
def apply_strategic_hash_disruption(self, image: Image.Image, strength: str = "medium") -> Image.Image:
"""
战略性哈希干扰:对各种哈希算法进行有针对性的干扰
整合了多种针对性干扰策略,包括块级混合干扰和针对特定哈希算法的干扰。
Args:
image: 输入图像
strength: 处理强度 ('low', 'medium', 'high')
Returns:
处理后的图像
"""
try:
logger.info(f"开始战略性哈希干扰 (强度: {strength})")
original_image_for_logging = image.copy()
# 设定策略应用概率
if strength == "low":
ahash_prob = 0.7
dhash_prob = 0.7
phash_prob = 0.9
block_prob = 0.6
block_size = 24
elif strength == "high":
ahash_prob = 0.9
dhash_prob = 0.9
phash_prob = 0.95
block_prob = 0.8
block_size = 16
else: # medium
ahash_prob = 0.8
dhash_prob = 0.8
phash_prob = 0.9
block_prob = 0.7
block_size = 20
logger.debug(f"策略概率: aHash={ahash_prob:.1f}, dHash={dhash_prob:.1f}, pHash={phash_prob:.1f}, 块级={block_prob:.1f}")
# 保存原图
result = image.copy()
applied_strategies = []
# 1. 应用块级混合干扰
if random.random() < block_prob:
result = self.apply_block_based_perturbations(result, block_size=block_size, strength=strength)
applied_strategies.append(f"BlockBased({block_size})")
# 2. 应用针对特定哈希算法的干扰
# 2.1 aHash特定干扰
if random.random() < ahash_prob:
result = self.apply_ahash_specific_disruption(result, strength)
applied_strategies.append("aHash")
# 2.2 dHash特定干扰
if random.random() < dhash_prob:
result = self.apply_dhash_specific_disruption(result, strength)
applied_strategies.append("dHash")
# 2.3 pHash特定干扰最重要的一个
if random.random() < phash_prob:
result = self.apply_phash_specific_disruption(result, strength)
applied_strategies.append("pHash")
logger.info(f"已应用战略干扰: {', '.join(applied_strategies)}")
# 对比修改前后
try:
diff = ImageChops.difference(original_image_for_logging.convert('RGB'), result.convert('RGB')).getbbox()
if diff:
logger.info(f"图像已修改。差异区域: {diff}")
else:
logger.warning("!!!战略干扰似乎未修改图像!!!")
except Exception as log_e:
logger.warning(f"无法比较图像差异: {log_e}")
logger.info(f"战略性哈希干扰完成 (强度: {strength})")
return result
except Exception as e:
logger.error(f"应用战略性哈希干扰时出错: {e}")
logger.error(traceback.format_exc())
return image # 出错时返回原图
def optimize_anti_hash_methods(self, image: Image.Image, strength: str = "medium") -> Image.Image:
"""优化后的哈希对抗方法,使用新的分层增强策略"""
logger.info(f"--- 开始优化抗哈希方法 (强度: {strength}) - 分层增强策略 ---")
processed_image = image.copy()
# 定义各阶段强度参数
global_max_crop: int
global_overlay_alpha: int
global_color_hist_strength: float
if strength == "low":
global_max_crop = 3
global_overlay_alpha = random.randint(8, 12)
global_color_hist_strength = 0.03
elif strength == "high":
global_max_crop = 10
global_overlay_alpha = random.randint(18, 25)
global_color_hist_strength = 0.08
else: # medium
global_max_crop = 6
global_overlay_alpha = random.randint(12, 18)
global_color_hist_strength = 0.05
logger.debug(f"分层策略 - 全局扰动参数: strength_for_crop='{strength}' (内部max_crop将按新标准), overlay_alpha={global_overlay_alpha}, color_hist_strength={global_color_hist_strength:.3f}")
# --- 层 1: 基础全局扰动 ---
logger.info("应用基础全局扰动...")
# 1.1 智能裁剪 + 重缩放 (现在 apply_smart_crop_resize 内部已增强)
processed_image = self.apply_smart_crop_resize(processed_image, strength)
# 1.2 低透明度噪声叠加
processed_image = self.apply_overlay_noise(processed_image, alpha=global_overlay_alpha, noise_type='uniform')
# 1.3 颜色直方图扰动
if global_color_hist_strength > 0: # 确保强度大于0才应用
processed_image = self.perturb_color_histogram(processed_image, strength=global_color_hist_strength)
# --- 层 2: 战略性哈希干扰 (在基础扰动之上) ---
# apply_strategic_hash_disruption 内部调用的各 specific 和 block_based 方法已增强
logger.info("应用战略性哈希干扰 (各子方法已增强)...")
processed_image = self.apply_strategic_hash_disruption(processed_image, strength)
# --- 清除元数据 ---
processed_image = self.strip_metadata(processed_image)
logger.info(f"--- 完成优化抗哈希方法 (强度: {strength}) - 分层增强策略 ---")
return processed_image
def optimized_process_image(
self,
image: Image.Image,
target_ratio: Tuple[int, int],
add_variation: bool = True,
seed: int = None,
variation_strength: str = "medium",
extra_effects: bool = True
) -> Image.Image:
"""优化后的图像处理方法,使用更高效的算法,添加反查重技术"""
# 设置随机种子
if seed is not None:
random.seed(seed)
np.random.seed(seed)
# 根据微调强度设置参数 (保留变化因子等)
if variation_strength == "low":
brightness_factor = random.uniform(0.97, 1.03)
contrast_factor = random.uniform(0.97, 1.03)
saturation_factor = random.uniform(0.97, 1.03)
max_rotation = 0.5
border_size = random.randint(0, 1)
# use_extra = random.random() < 0.3 and extra_effects #<-- 旧逻辑
elif variation_strength == "high":
brightness_factor = random.uniform(0.92, 1.08)
contrast_factor = random.uniform(0.92, 1.08)
saturation_factor = random.uniform(0.92, 1.08)
max_rotation = 2.0
border_size = random.randint(0, 3)
# use_extra = extra_effects #<-- 旧逻辑 (本身就是直接赋值)
else: # medium
brightness_factor = random.uniform(0.95, 1.05)
contrast_factor = random.uniform(0.95, 1.05)
saturation_factor = random.uniform(0.95, 1.05)
max_rotation = 1.0
border_size = random.randint(0, 2)
# use_extra = random.random() < 0.7 and extra_effects #<-- 旧逻辑
# --- FIX: 直接使用传入的 extra_effects 控制是否启用抗哈希和额外效果 ---
use_extra = extra_effects
# --- END FIX ---
# 调整图像为目标比例
width, height = image.size
current_ratio = width / height
target_ratio_value = target_ratio[0] / target_ratio[1]
# 调整大小
if current_ratio > target_ratio_value: # 图片较宽
new_height = 1200
new_width = int(new_height * current_ratio)
else: # 图片较高
new_width = 900
new_height = int(new_width / current_ratio)
# 高效调整尺寸
resized_image = image.resize((new_width, new_height), Image.LANCZOS)
# 裁剪为目标比例
resized_width, resized_height = resized_image.size
if resized_width / resized_height > target_ratio_value:
crop_width = int(resized_height * target_ratio_value)
max_offset = max(1, min(10, (resized_width - crop_width) // 10))
offset = random.randint(-max_offset, max_offset) if add_variation else 0
crop_x1 = max(0, min((resized_width - crop_width) // 2 + offset, resized_width - crop_width))
crop_x2 = crop_x1 + crop_width
result = resized_image.crop((crop_x1, 0, crop_x2, resized_height))
else:
crop_height = int(resized_width / target_ratio_value)
max_offset = max(1, min(10, (resized_height - crop_height) // 10))
offset = random.randint(-max_offset, max_offset) if add_variation else 0
crop_y1 = max(0, min((resized_height - crop_height) // 2 + offset, resized_height - crop_height))
crop_y2 = crop_y1 + crop_height
result = resized_image.crop((0, crop_y1, resized_width, crop_y2))
# 如果不需要变化或是低强度且禁用额外效果
if not add_variation:
logger.info("add_variation=False跳过所有变化和抗哈希处理。")
# 重置随机种子
if seed is not None:
random.seed()
np.random.seed()
# 清除元数据后返回
return self.strip_metadata(result)
logger.info(f"应用基础变化和抗哈希处理 (强度: {variation_strength}, 额外效果: {use_extra})")
processed_image = result.convert('RGB')
# 1. 亮度调整
if abs(brightness_factor - 1.0) > 0.01:
enhancer = ImageEnhance.Brightness(processed_image)
processed_image = enhancer.enhance(brightness_factor)
# 2. 对比度调整
if abs(contrast_factor - 1.0) > 0.01:
enhancer = ImageEnhance.Contrast(processed_image)
processed_image = enhancer.enhance(contrast_factor)
# 3. 饱和度调整
if abs(saturation_factor - 1.0) > 0.01:
enhancer = ImageEnhance.Color(processed_image)
processed_image = enhancer.enhance(saturation_factor)
# 4. 旋转 (只在中高强度时应用)
if variation_strength != "low" and abs(max_rotation) > 0.1:
rotation_angle = random.uniform(-max_rotation, max_rotation)
if abs(rotation_angle) > 0.1: # 只有当角度足够大时才旋转
processed_image = processed_image.rotate(rotation_angle, resample=Image.BICUBIC, expand=False)
# 5. 应用抗哈希技术
if use_extra:
logger.debug("调用 optimize_anti_hash_methods...")
processed_image = self.optimize_anti_hash_methods(processed_image, variation_strength)
else:
logger.info("use_extra=False跳过 optimize_anti_hash_methods。")
# 应用模糊/锐化/边框等额外效果 (如果 use_extra 为 True)
if use_extra:
logger.debug("应用额外效果 (模糊/锐化/边框)...")
# 根据强度决定是否应用特定效果
apply_sharpen = random.random() < 0.4
apply_blur = not apply_sharpen and random.random() < 0.3
# 锐化
if apply_sharpen:
enhancer = ImageEnhance.Sharpness(processed_image)
sharpness = 1.2 if variation_strength == "high" else 1.1
processed_image = enhancer.enhance(sharpness)
# 模糊
elif apply_blur:
radius = 0.7 if variation_strength == "high" else 0.4
processed_image = processed_image.filter(ImageFilter.GaussianBlur(radius=radius))
# 边框处理 (在图像不太小的情况下)
if border_size > 0 and min(processed_image.size) > 300:
border_color = (
random.randint(0, 5),
random.randint(0, 5),
random.randint(0, 5)
)
w, h = processed_image.size
bordered = Image.new('RGB', (w + border_size*2, h + border_size*2), border_color)
bordered.paste(processed_image, (border_size, border_size))
logger.debug("额外效果应用完成。")
else:
logger.info("use_extra=False跳过额外效果。")
# **关键:确保在所有修改之后调用修复后的 strip_metadata**
logger.debug("最后调用 strip_metadata 清除元数据。")
final_image = self.strip_metadata(processed_image)
# 重置随机种子
if seed is not None:
random.seed()
np.random.seed()
logger.debug("随机种子已重置。")
logger.info(f"图像处理完成 (强度: {variation_strength})")
return final_image
def process_poster_for_notes(
run_id: str,
topic_index: int,
variant_index: int,
poster_image_path: str,
poster_metadata_path: str,
source_image_dir: str,
num_additional_images: int,
output_handler: OutputHandler,
output_filename_template: str = "note_{index}.jpg"
) -> List[str]:
"""
处理海报并创建笔记图像
Args:
run_id: 运行ID
topic_index: 主题索引
variant_index: 变体索引
poster_image_path: 海报图像路径
poster_metadata_path: 海报元数据路径
source_image_dir: 源图像目录
num_additional_images: 要使用的额外图像数量
output_handler: 输出处理器
output_filename_template: 输出文件名模板
Returns:
List[str]: 保存的笔记图像路径列表
"""
logger.info(f"开始为海报创建笔记图像: {poster_image_path}")
# 验证输入
if not os.path.exists(poster_image_path):
logger.error(f"海报图像不存在: {poster_image_path}")
return []
# 创建处理器实例并处理
creator = PosterNotesCreator(output_handler)
return creator.create_notes_images(
run_id,
topic_index,
variant_index,
poster_image_path,
poster_metadata_path,
source_image_dir,
num_additional_images,
output_filename_template
)
def select_additional_images(
run_id: str,
topic_index: int,
variant_index: int,
poster_metadata_path: str,
source_image_dir: str,
num_additional_images: int,
output_handler: OutputHandler,
output_filename_template: str = "additional_{index}.jpg",
variation_strength: str = "medium",
extra_effects: bool = True,
collage_style: str = "grid_2x2" # 默认使用grid风格
) -> List[str]:
"""
选择未被海报使用的图像作为额外配图创建2x2网格拼接图像
Args:
run_id: 运行ID
topic_index: 主题索引
variant_index: 变体索引
poster_metadata_path: 海报元数据路径
source_image_dir: 源图像目录
num_additional_images: 要使用的额外图像数量
output_handler: 输出处理器
output_filename_template: 输出文件名模板
variation_strength: 变化强度
extra_effects: 是否应用额外效果
collage_style: 拼图风格,固定为 'grid'
Returns:
List[str]: 保存的图像路径列表
"""
logger.info(f"开始为主题 {topic_index} 变体 {variant_index} 选择额外配图2x2网格风格")
# 验证输入
if not os.path.exists(poster_metadata_path):
logger.error(f"海报元数据不存在: {poster_metadata_path}")
return []
# 创建处理器实例
creator = PosterNotesCreator(output_handler)
# 使用拼图处理图像
return creator.create_additional_images(
run_id,
topic_index,
variant_index,
poster_metadata_path,
source_image_dir,
num_additional_images,
output_filename_template,
variation_strength,
extra_effects,
collage_style
)