import os import random import logging import json from PIL import Image import traceback from typing import List, Tuple, Dict, Any, Optional import concurrent.futures import numpy as np from PIL import ImageEnhance, ImageFilter from .output_handler import OutputHandler import io logger = logging.getLogger(__name__) class PosterNotesCreator: """ 处理原始海报作为主图,并随机选择额外的图片作为笔记图片。 确保选择的笔记图片与海报中使用的图片不重复。 """ def __init__(self, output_handler: OutputHandler): """ 初始化 PosterNotesCreator Args: output_handler: 可选的 OutputHandler 实例,用于处理输出 """ self.output_handler = output_handler logging.info("PosterNotesCreator 初始化完成") def create_notes_images( self, run_id: str, topic_index: int, variant_index: int, poster_image_path: str, poster_metadata_path: str, source_image_dir: str, num_additional_images: int, output_filename_template: str = "note_{index}.jpg" ) -> List[str]: """ 创建笔记图像 Args: run_id: 运行ID topic_index: 主题索引 variant_index: 变体索引 poster_image_path: 海报图像路径 poster_metadata_path: 海报元数据路径 source_image_dir: 源图像目录 num_additional_images: 要使用的额外图像数量 output_filename_template: 输出文件名模板 Returns: List[str]: 保存的笔记图像路径列表 """ # 检查输入路径是否存在 if not os.path.exists(poster_image_path): logger.error(f"海报图像不存在: {poster_image_path}") return [] if not os.path.exists(poster_metadata_path): logger.error(f"海报元数据不存在: {poster_metadata_path}") return [] if not os.path.exists(source_image_dir) or not os.path.isdir(source_image_dir): logger.error(f"源图像目录不存在: {source_image_dir}") return [] # 从元数据文件中读取已使用的图像信息 try: with open(poster_metadata_path, 'r', encoding='utf-8') as f: poster_metadata = json.load(f) except Exception as e: logger.error(f"无法读取海报元数据: {e}") return [] # 获取已经在海报中使用的图像 used_images = [] if 'collage_images' in poster_metadata: used_images = poster_metadata['collage_images'] logger.info(f"海报中已使用 {len(used_images)} 张图像: {', '.join(used_images)}") # 列出源目录中的所有图像文件 image_extensions = ('.jpg', '.jpeg', '.png', '.bmp') available_images = [ f for f in os.listdir(source_image_dir) if os.path.isfile(os.path.join(source_image_dir, f)) and f.lower().endswith(image_extensions) ] if not available_images: logger.error(f"源目录中没有找到图像: {source_image_dir}") return [] logger.info(f"源目录中找到 {len(available_images)} 张图像") # 过滤掉已经在海报中使用的图像 available_images = [img for img in available_images if img not in used_images] if not available_images: logger.warning("所有图像都已在海报中使用,无法创建额外笔记") return [] logger.info(f"过滤后可用图像数量: {len(available_images)}") # 如果可用图像少于请求数量,进行警告但继续处理 if len(available_images) < num_additional_images: logger.warning( f"可用图像数量 ({len(available_images)}) 少于请求的笔记数量 ({num_additional_images})," f"将使用所有可用图像" ) selected_images = available_images else: # 随机选择额外图像 selected_images = random.sample(available_images, num_additional_images) logger.info(f"已选择 {len(selected_images)} 张图像作为笔记") # 保存选择的笔记图像 saved_paths = [] for i, image_filename in enumerate(selected_images): try: # 加载图像 image_path = os.path.join(source_image_dir, image_filename) image = Image.open(image_path) # 生成输出文件名 output_filename = output_filename_template.format(index=i+1) # 创建元数据 note_metadata = { "original_image": image_filename, "note_index": i + 1, "source_dir": source_image_dir, "associated_poster": os.path.basename(poster_image_path) } # 使用输出处理器保存图像 saved_path = self.output_handler.handle_generated_image( run_id, topic_index, variant_index, 'note', # 图像类型为note image, output_filename, note_metadata ) saved_paths.append(saved_path) logger.info(f"已保存笔记图像 {i+1}/{len(selected_images)}: {saved_path}") except Exception as e: logger.error(f"处理图像时出错 '{image_filename}': {e}") return saved_paths def create_additional_images( self, run_id: str, topic_index: int, variant_index: int, poster_metadata_path: str, source_image_dir: str, num_additional_images: int = 3, output_filename_template: str = "additional_{index}.jpg", variation_strength: str = "medium", extra_effects: bool = True ) -> List[str]: """选择未被海报使用的图像作为额外配图,并处理为3:4比例""" logger.info(f"开始为主题 {topic_index} 变体 {variant_index} 选择额外配图") # 获取候选图像 candidate_images = self.get_candidate_images( poster_metadata_path, source_image_dir, num_additional_images ) if not candidate_images: logger.warning("没有找到合适的候选图像") return [] # 生成唯一的随机种子 seed_str = f"{run_id}_{topic_index}_{variant_index}" seed = sum(ord(c) for c in seed_str) logger.info(f"使用随机种子: {seed},基于: {seed_str}") # 使用多进程并行处理图像 saved_paths = [] with concurrent.futures.ProcessPoolExecutor(max_workers=min(4, len(candidate_images))) as executor: # 创建任务 future_to_image = {} for i, image_filename in enumerate(candidate_images): image_path = os.path.join(source_image_dir, image_filename) # 为每个图像创建单独的种子 image_seed = seed + i future = executor.submit( self.process_single_image, run_id, topic_index, variant_index, image_path, image_filename, i, source_image_dir, output_filename_template.format(index=i+1), image_seed, variation_strength, extra_effects ) future_to_image[future] = (i, image_filename) # 收集结果 for future in concurrent.futures.as_completed(future_to_image): i, image_filename = future_to_image[future] try: saved_path = future.result() if saved_path: saved_paths.append(saved_path) logger.info(f"已保存额外配图 {i+1}/{len(candidate_images)}: {saved_path}") except Exception as e: logger.error(f"处理图像时出错 '{image_filename}': {e}") logger.error(traceback.format_exc()) return saved_paths def get_candidate_images(self, poster_metadata_path, source_image_dir, num_images): """获取候选图像列表,排除已用于海报的图像""" # 检查输入路径是否存在 if not os.path.exists(poster_metadata_path): logger.error(f"海报元数据不存在: {poster_metadata_path}") return [] if not os.path.exists(source_image_dir) or not os.path.isdir(source_image_dir): logger.error(f"源图像目录不存在: {source_image_dir}") return [] # 从元数据文件中读取已使用的图像信息 try: with open(poster_metadata_path, 'r', encoding='utf-8') as f: poster_metadata = json.load(f) except Exception as e: logger.error(f"无法读取海报元数据: {e}") return [] # 获取已经在海报中使用的图像 used_images = [] if 'collage_images' in poster_metadata: used_images = poster_metadata['collage_images'] logger.info(f"海报中已使用 {len(used_images)} 张图像: {', '.join(used_images)}") # 列出源目录中的所有图像文件 image_extensions = ('.jpg', '.jpeg', '.png', '.bmp') available_images = [ f for f in os.listdir(source_image_dir) if os.path.isfile(os.path.join(source_image_dir, f)) and f.lower().endswith(image_extensions) ] if not available_images: logger.error(f"源目录中没有找到图像: {source_image_dir}") return [] logger.info(f"源目录中找到 {len(available_images)} 张图像") # 过滤掉已经在海报中使用的图像 available_images = [img for img in available_images if img not in used_images] if not available_images: logger.warning("所有图像都已在海报中使用,无法创建额外配图") return [] logger.info(f"过滤后可用图像数量: {len(available_images)}") # 如果可用图像少于请求数量,进行警告但继续处理 if len(available_images) < num_images: logger.warning( f"可用图像数量 ({len(available_images)}) 少于请求的配图数量 ({num_images})," f"将使用所有可用图像" ) selected_images = available_images else: # 随机选择额外图像 random.seed(sum(map(ord, ''.join(available_images)))) # 确保结果一致性 selected_images = random.sample(available_images, num_images) random.seed() # 重置随机种子 return selected_images def process_single_image( self, run_id, topic_index, variant_index, image_path, image_filename, index, source_dir, output_filename, seed, variation_strength, extra_effects ): """处理单张图像 - 此方法可在独立进程中运行""" try: # 加载图像 image = Image.open(image_path) # 处理图像为3:4比例,并添加微小变化 processed_image = self.optimized_process_image( image, (3, 4), add_variation=True, seed=seed, variation_strength=variation_strength, extra_effects=extra_effects ) # 创建元数据 additional_metadata = { "original_image": image_filename, "additional_index": index + 1, "source_dir": source_dir, "is_additional_image": True, "processed": True, "aspect_ratio": "3:4", "variation_applied": True, "variation_strength": variation_strength, "extra_effects": extra_effects } # 使用输出处理器保存图像 return self.output_handler.handle_generated_image( run_id, topic_index, variant_index, 'additional', # 图像类型为additional processed_image, output_filename, additional_metadata ) except Exception as e: logger.error(f"处理图像时出错 '{image_filename}': {e}") logger.error(traceback.format_exc()) return None def add_phash_noise(self, image: Image.Image, intensity: float = 0.05) -> Image.Image: """ 添加扰动以对抗感知哈希算法(pHash) 通过在频域添加低频扰动实现 Args: image: 输入图像 intensity: 扰动强度(0-1) Returns: 添加扰动后的图像 """ # 灰度化处理 gray_image = image.convert('L') width, height = gray_image.size # 确保宽高是8的倍数(DCT通常用8x8块) new_width = (width // 8) * 8 new_height = (height // 8) * 8 if new_width != width or new_height != height: gray_image = gray_image.resize((new_width, new_height)) # 转为numpy数组 img_array = np.array(gray_image) # 简化版DCT域扰动 # 分块处理图像 for y in range(0, new_height, 8): for x in range(0, new_width, 8): block = img_array[y:y+8, x:x+8].astype(float) # 简单DCT - 对块应用频域变化 # 这里使用简单方法模拟DCT效果 # 真正的DCT需要使用scipy.fftpack avg = np.mean(block) # 修改低频区块(除直流分量外) noise_value = random.uniform(-intensity * 10, intensity * 10) # 扰动左上角的低频系数(类似于DCT中的低频区域) block[1:3, 1:3] += noise_value # 应用回原图 img_array[y:y+8, x:x+8] = np.clip(block, 0, 255) # 转回PIL图像 modified_image = Image.fromarray(img_array.astype(np.uint8)) # 调整回原始大小 if new_width != width or new_height != height: modified_image = modified_image.resize((width, height), Image.LANCZOS) # 将修改后的灰度通道应用到原彩色图像 if image.mode == 'RGB': r, g, b = image.split() # 混合原始图像与修改过的灰度图 blend_factor = 0.2 # 混合强度 r = Image.blend(r, modified_image, blend_factor) g = Image.blend(g, modified_image, blend_factor) b = Image.blend(b, modified_image, blend_factor) return Image.merge('RGB', (r, g, b)) else: return modified_image def perturb_color_histogram(self, image: Image.Image, strength: float = 0.03) -> Image.Image: """ 扰动图像的颜色直方图,对抗基于颜色统计的图像匹配 Args: image: 输入图像 strength: 扰动强度(0-1) Returns: 处理后的图像 """ # 确保为RGB模式 if image.mode != 'RGB': image = image.convert('RGB') # 转为numpy数组 img_array = np.array(image) height, width, channels = img_array.shape # 对每个通道分别处理 for channel in range(channels): # 计算当前通道的直方图 hist, _ = np.histogram(img_array[:,:,channel].flatten(), bins=64, range=(0, 256)) # 找出主要颜色区间 (频率高的区间) threshold = np.percentile(hist, 70) # 取前30%的颜色块 significant_bins = np.where(hist > threshold)[0] if len(significant_bins) > 0: for bin_idx in significant_bins: # 计算当前bin对应的颜色范围 bin_width = 256 // 64 color_low = bin_idx * bin_width color_high = (bin_idx + 1) * bin_width # 创建颜色范围掩码 mask = (img_array[:,:,channel] >= color_low) & (img_array[:,:,channel] < color_high) if np.any(mask): # 生成随机偏移值 offset = int(strength * bin_width * (random.random() - 0.5) * 2) # 应用偏移,确保在0-255范围内 img_array[:,:,channel][mask] = np.clip( img_array[:,:,channel][mask] + offset, 0, 255).astype(np.uint8) # 转回PIL图像 return Image.fromarray(img_array) def strip_metadata(self, image: Image.Image) -> Image.Image: """ 移除图像中的所有元数据 Args: image: 输入图像 Returns: 无元数据的图像 """ # 创建无元数据的副本 data = io.BytesIO() image.save(data, format=image.format if image.format else 'PNG') return Image.open(data) def optimized_process_image( self, image: Image.Image, target_ratio: Tuple[int, int], add_variation: bool = True, seed: int = None, variation_strength: str = "medium", extra_effects: bool = True ) -> Image.Image: """优化后的图像处理方法,使用更高效的算法,添加反查重技术""" # 设置随机种子 if seed is not None: random.seed(seed) np.random.seed(seed) # 根据微调强度设置参数 if variation_strength == "low": brightness_factor = random.uniform(0.97, 1.03) contrast_factor = random.uniform(0.97, 1.03) saturation_factor = random.uniform(0.97, 1.03) max_rotation = 0.5 border_size = random.randint(0, 1) use_extra = random.random() < 0.3 and extra_effects elif variation_strength == "high": brightness_factor = random.uniform(0.92, 1.08) contrast_factor = random.uniform(0.92, 1.08) saturation_factor = random.uniform(0.92, 1.08) max_rotation = 2.0 border_size = random.randint(0, 3) use_extra = extra_effects else: # medium brightness_factor = random.uniform(0.95, 1.05) contrast_factor = random.uniform(0.95, 1.05) saturation_factor = random.uniform(0.95, 1.05) max_rotation = 1.0 border_size = random.randint(0, 2) use_extra = random.random() < 0.7 and extra_effects # 调整图像为目标比例 width, height = image.size current_ratio = width / height target_ratio_value = target_ratio[0] / target_ratio[1] # 调整大小 if current_ratio > target_ratio_value: # 图片较宽 new_height = 1200 new_width = int(new_height * current_ratio) else: # 图片较高 new_width = 900 new_height = int(new_width / current_ratio) # 高效调整尺寸 resized_image = image.resize((new_width, new_height), Image.LANCZOS) # 裁剪为目标比例 resized_width, resized_height = resized_image.size if resized_width / resized_height > target_ratio_value: crop_width = int(resized_height * target_ratio_value) max_offset = max(1, min(10, (resized_width - crop_width) // 10)) offset = random.randint(-max_offset, max_offset) if add_variation else 0 crop_x1 = max(0, min((resized_width - crop_width) // 2 + offset, resized_width - crop_width)) crop_x2 = crop_x1 + crop_width result = resized_image.crop((crop_x1, 0, crop_x2, resized_height)) else: crop_height = int(resized_width / target_ratio_value) max_offset = max(1, min(10, (resized_height - crop_height) // 10)) offset = random.randint(-max_offset, max_offset) if add_variation else 0 crop_y1 = max(0, min((resized_height - crop_height) // 2 + offset, resized_height - crop_height)) crop_y2 = crop_y1 + crop_height result = resized_image.crop((0, crop_y1, resized_width, crop_y2)) # 如果不需要变化或是低强度且禁用额外效果 if not add_variation: # 重置随机种子 if seed is not None: random.seed() np.random.seed() # 清除元数据后返回 return self.strip_metadata(result) # 高效应用基本变化 processed_image = result.convert('RGB') # 1. 亮度调整 if abs(brightness_factor - 1.0) > 0.01: enhancer = ImageEnhance.Brightness(processed_image) processed_image = enhancer.enhance(brightness_factor) # 2. 对比度调整 if abs(contrast_factor - 1.0) > 0.01: enhancer = ImageEnhance.Contrast(processed_image) processed_image = enhancer.enhance(contrast_factor) # 3. 饱和度调整 if abs(saturation_factor - 1.0) > 0.01: enhancer = ImageEnhance.Color(processed_image) processed_image = enhancer.enhance(saturation_factor) # 4. 旋转 (只在中高强度时应用) if variation_strength != "low" and abs(max_rotation) > 0.1: rotation_angle = random.uniform(-max_rotation, max_rotation) if abs(rotation_angle) > 0.1: # 只有当角度足够大时才旋转 processed_image = processed_image.rotate(rotation_angle, resample=Image.BICUBIC, expand=False) # 5. 新增 - 应用反查重技术 # 根据变化强度选择性应用 if use_extra: # 随机决定应用哪些反查重技术 apply_phash = random.random() < 0.7 apply_color = random.random() < 0.7 # 感知哈希干扰 (在中高强度变化时应用) if apply_phash and variation_strength != "low": phash_intensity = 0.05 if variation_strength == "medium" else 0.08 processed_image = self.add_phash_noise(processed_image, phash_intensity) # 颜色直方图扰动 if apply_color: color_strength = 0.02 if variation_strength == "low" else \ 0.04 if variation_strength == "medium" else 0.06 processed_image = self.perturb_color_histogram(processed_image, color_strength) # 应用额外效果 (只在需要时) if use_extra: # 根据强度决定是否应用特定效果 apply_sharpen = random.random() < 0.4 apply_blur = not apply_sharpen and random.random() < 0.3 # 锐化 if apply_sharpen: enhancer = ImageEnhance.Sharpness(processed_image) sharpness = 1.2 if variation_strength == "high" else 1.1 processed_image = enhancer.enhance(sharpness) # 模糊 elif apply_blur: radius = 0.7 if variation_strength == "high" else 0.4 processed_image = processed_image.filter(ImageFilter.GaussianBlur(radius=radius)) # 边框处理 (在图像不太小的情况下) if border_size > 0 and min(processed_image.size) > 300: border_color = ( random.randint(0, 5), random.randint(0, 5), random.randint(0, 5) ) w, h = processed_image.size bordered = Image.new('RGB', (w + border_size*2, h + border_size*2), border_color) bordered.paste(processed_image, (border_size, border_size)) # 随机裁剪回原尺寸 offset_x = random.randint(0, border_size*2) offset_y = random.randint(0, border_size*2) processed_image = bordered.crop((offset_x, offset_y, offset_x + w, offset_y + h)) # 6. 始终清除元数据 - 最后一步 processed_image = self.strip_metadata(processed_image) # 重置随机种子 if seed is not None: random.seed() np.random.seed() return processed_image def process_poster_for_notes( run_id: str, topic_index: int, variant_index: int, poster_image_path: str, poster_metadata_path: str, source_image_dir: str, num_additional_images: int, output_handler: OutputHandler, output_filename_template: str = "note_{index}.jpg" ) -> List[str]: """ 处理海报并创建笔记图像 Args: run_id: 运行ID topic_index: 主题索引 variant_index: 变体索引 poster_image_path: 海报图像路径 poster_metadata_path: 海报元数据路径 source_image_dir: 源图像目录 num_additional_images: 要使用的额外图像数量 output_handler: 输出处理器 output_filename_template: 输出文件名模板 Returns: List[str]: 保存的笔记图像路径列表 """ logger.info(f"开始为海报创建笔记图像: {poster_image_path}") # 验证输入 if not os.path.exists(poster_image_path): logger.error(f"海报图像不存在: {poster_image_path}") return [] # 创建处理器实例并处理 creator = PosterNotesCreator(output_handler) return creator.create_notes_images( run_id, topic_index, variant_index, poster_image_path, poster_metadata_path, source_image_dir, num_additional_images, output_filename_template ) def select_additional_images( run_id: str, topic_index: int, variant_index: int, poster_metadata_path: str, source_image_dir: str, num_additional_images: int, output_handler: OutputHandler, output_filename_template: str = "additional_{index}.jpg", variation_strength: str = "medium", extra_effects: bool = True ) -> List[str]: """ 选择未被海报使用的图像作为额外配图,并处理为3:4比例 """ logger.info(f"开始为主题 {topic_index} 变体 {variant_index} 选择额外配图") # 验证输入 if not os.path.exists(poster_metadata_path): logger.error(f"海报元数据不存在: {poster_metadata_path}") return [] # 创建处理器实例 creator = PosterNotesCreator(output_handler) # 使用优化后的方法处理图像 return creator.create_additional_images( run_id, topic_index, variant_index, poster_metadata_path, source_image_dir, num_additional_images, output_filename_template, variation_strength, extra_effects )