import os import random import logging import json from PIL import Image import traceback from typing import List, Tuple, Dict, Any, Optional from .output_handler import OutputHandler logger = logging.getLogger(__name__) class PosterNotesCreator: """ 处理原始海报作为主图,并随机选择额外的图片作为笔记图片。 确保选择的笔记图片与海报中使用的图片不重复。 """ def __init__(self, output_handler: OutputHandler): """ 初始化 PosterNotesCreator Args: output_handler: 可选的 OutputHandler 实例,用于处理输出 """ self.output_handler = output_handler logging.info("PosterNotesCreator 初始化完成") def create_notes_images( self, run_id: str, topic_index: int, variant_index: int, poster_image_path: str, poster_metadata_path: str, source_image_dir: str, num_additional_images: int, output_filename_template: str = "note_{index}.jpg" ) -> List[str]: """ 创建笔记图像 Args: run_id: 运行ID topic_index: 主题索引 variant_index: 变体索引 poster_image_path: 海报图像路径 poster_metadata_path: 海报元数据路径 source_image_dir: 源图像目录 num_additional_images: 要使用的额外图像数量 output_filename_template: 输出文件名模板 Returns: List[str]: 保存的笔记图像路径列表 """ # 检查输入路径是否存在 if not os.path.exists(poster_image_path): logger.error(f"海报图像不存在: {poster_image_path}") return [] if not os.path.exists(poster_metadata_path): logger.error(f"海报元数据不存在: {poster_metadata_path}") return [] if not os.path.exists(source_image_dir) or not os.path.isdir(source_image_dir): logger.error(f"源图像目录不存在: {source_image_dir}") return [] # 从元数据文件中读取已使用的图像信息 try: with open(poster_metadata_path, 'r', encoding='utf-8') as f: poster_metadata = json.load(f) except Exception as e: logger.error(f"无法读取海报元数据: {e}") return [] # 获取已经在海报中使用的图像 used_images = [] if 'collage_images' in poster_metadata: used_images = poster_metadata['collage_images'] logger.info(f"海报中已使用 {len(used_images)} 张图像: {', '.join(used_images)}") # 列出源目录中的所有图像文件 image_extensions = ('.jpg', '.jpeg', '.png', '.bmp') available_images = [ f for f in os.listdir(source_image_dir) if os.path.isfile(os.path.join(source_image_dir, f)) and f.lower().endswith(image_extensions) ] if not available_images: logger.error(f"源目录中没有找到图像: {source_image_dir}") return [] logger.info(f"源目录中找到 {len(available_images)} 张图像") # 过滤掉已经在海报中使用的图像 available_images = [img for img in available_images if img not in used_images] if not available_images: logger.warning("所有图像都已在海报中使用,无法创建额外笔记") return [] logger.info(f"过滤后可用图像数量: {len(available_images)}") # 如果可用图像少于请求数量,进行警告但继续处理 if len(available_images) < num_additional_images: logger.warning( f"可用图像数量 ({len(available_images)}) 少于请求的笔记数量 ({num_additional_images})," f"将使用所有可用图像" ) selected_images = available_images else: # 随机选择额外图像 selected_images = random.sample(available_images, num_additional_images) logger.info(f"已选择 {len(selected_images)} 张图像作为笔记") # 保存选择的笔记图像 saved_paths = [] for i, image_filename in enumerate(selected_images): try: # 加载图像 image_path = os.path.join(source_image_dir, image_filename) image = Image.open(image_path) # 生成输出文件名 output_filename = output_filename_template.format(index=i+1) # 创建元数据 note_metadata = { "original_image": image_filename, "note_index": i + 1, "source_dir": source_image_dir, "associated_poster": os.path.basename(poster_image_path) } # 使用输出处理器保存图像 saved_path = self.output_handler.handle_generated_image( run_id, topic_index, variant_index, 'note', # 图像类型为note image, output_filename, note_metadata ) saved_paths.append(saved_path) logger.info(f"已保存笔记图像 {i+1}/{len(selected_images)}: {saved_path}") except Exception as e: logger.error(f"处理图像时出错 '{image_filename}': {e}") return saved_paths def create_additional_images( self, run_id: str, topic_index: int, variant_index: int, poster_metadata_path: str, source_image_dir: str, num_additional_images: int = 3, output_filename_template: str = "additional_{index}.jpg", variation_strength: str = "medium", extra_effects: bool = True ) -> List[str]: """ 选择未被海报使用的图像作为额外配图,并处理为3:4比例 Args: run_id: 运行ID topic_index: 主题索引 variant_index: 变体索引 poster_metadata_path: 海报元数据路径 source_image_dir: 源图像目录 num_additional_images: 要选择的额外图像数量 output_filename_template: 输出文件名模板 variation_strength: 微调强度 - "low", "medium", "high" extra_effects: 是否添加额外效果 Returns: List[str]: 保存的额外配图路径列表 """ logger.info(f"开始为主题 {topic_index} 变体 {variant_index} 选择额外配图") # 检查输入路径是否存在 if not os.path.exists(poster_metadata_path): logger.error(f"海报元数据不存在: {poster_metadata_path}") return [] if not os.path.exists(source_image_dir) or not os.path.isdir(source_image_dir): logger.error(f"源图像目录不存在: {source_image_dir}") return [] # 从元数据文件中读取已使用的图像信息 try: with open(poster_metadata_path, 'r', encoding='utf-8') as f: poster_metadata = json.load(f) except Exception as e: logger.error(f"无法读取海报元数据: {e}") return [] # 获取已经在海报中使用的图像 used_images = [] if 'collage_images' in poster_metadata: used_images = poster_metadata['collage_images'] logger.info(f"海报中已使用 {len(used_images)} 张图像: {', '.join(used_images)}") # 列出源目录中的所有图像文件 image_extensions = ('.jpg', '.jpeg', '.png', '.bmp') available_images = [ f for f in os.listdir(source_image_dir) if os.path.isfile(os.path.join(source_image_dir, f)) and f.lower().endswith(image_extensions) ] if not available_images: logger.error(f"源目录中没有找到图像: {source_image_dir}") return [] logger.info(f"源目录中找到 {len(available_images)} 张图像") # 过滤掉已经在海报中使用的图像 available_images = [img for img in available_images if img not in used_images] if not available_images: logger.warning("所有图像都已在海报中使用,无法创建额外配图") return [] logger.info(f"过滤后可用图像数量: {len(available_images)}") # 如果可用图像少于请求数量,进行警告但继续处理 if len(available_images) < num_additional_images: logger.warning( f"可用图像数量 ({len(available_images)}) 少于请求的配图数量 ({num_additional_images})," f"将使用所有可用图像" ) selected_images = available_images else: # 生成唯一的随机种子,基于run_id、topic_index和variant_index seed_str = f"{run_id}_{topic_index}_{variant_index}" seed = sum(ord(c) for c in seed_str) logger.info(f"使用随机种子: {seed},基于: {seed_str}") random.seed(seed) # 随机选择额外图像 selected_images = random.sample(available_images, num_additional_images) # 重置随机种子,不影响其他随机操作 random.seed() logger.info(f"已选择 {len(selected_images)} 张图像作为额外配图") logger.info(f"微调强度: {variation_strength}, 额外效果: {'启用' if extra_effects else '禁用'}") # 保存选择的额外配图 saved_paths = [] for i, image_filename in enumerate(selected_images): try: # 加载图像 image_path = os.path.join(source_image_dir, image_filename) image = Image.open(image_path) # 处理图像为3:4比例,并添加微小变化 # 使用不同的种子确保每个图像的变化各不相同 variation_seed = seed + i if 'seed' in locals() else i + 1 processed_image = self.process_image_to_aspect_ratio( image, (3, 4), add_variation=True, seed=variation_seed, variation_strength=variation_strength, extra_effects=extra_effects ) # 生成输出文件名 output_filename = output_filename_template.format(index=i+1) # 创建元数据 additional_metadata = { "original_image": image_filename, "additional_index": i + 1, "source_dir": source_image_dir, "is_additional_image": True, "processed": True, "aspect_ratio": "3:4", "variation_applied": True, "variation_strength": variation_strength, "extra_effects": extra_effects } # 使用输出处理器保存图像 saved_path = self.output_handler.handle_generated_image( run_id, topic_index, variant_index, 'additional', # 图像类型为additional processed_image, output_filename, additional_metadata ) saved_paths.append(saved_path) logger.info(f"已保存额外配图 {i+1}/{len(selected_images)}: {saved_path}") except Exception as e: logger.error(f"处理图像时出错 '{image_filename}': {e}") return saved_paths def process_image_to_aspect_ratio( self, image: Image.Image, target_ratio: Tuple[int, int], add_variation: bool = True, seed: int = None, variation_strength: str = "medium", # 新参数: 微调强度 - "low", "medium", "high" extra_effects: bool = True # 新参数: 是否添加额外效果 ) -> Image.Image: """ 处理图像到指定的宽高比,并添加微小变化 Args: image: 原始图像 target_ratio: 目标宽高比,如(3, 4)表示3:4的比例 add_variation: 是否添加微小变化以避免哈希检测 seed: 随机种子,用于确保变化的可重复性 variation_strength: 微调强度 - "low", "medium", "high" extra_effects: 是否添加额外效果(噪点、微透视变换等) Returns: Image.Image: 处理后的图像 """ # 如果指定了种子,设置随机种子 if seed is not None: random.seed(seed) # 根据微调强度设置参数范围 if variation_strength == "low": brightness_range = (-0.03, 0.03) contrast_range = (-0.03, 0.03) saturation_range = (-0.03, 0.03) hue_range = (-0.01, 0.01) max_crop_px = 3 max_rotation = 0.5 noise_intensity = 0.01 border_size_range = (0, 2) elif variation_strength == "high": brightness_range = (-0.08, 0.08) contrast_range = (-0.08, 0.08) saturation_range = (-0.08, 0.08) hue_range = (-0.02, 0.02) max_crop_px = 8 max_rotation = 2.0 noise_intensity = 0.03 border_size_range = (0, 4) else: # medium (默认) brightness_range = (-0.05, 0.05) contrast_range = (-0.05, 0.05) saturation_range = (-0.05, 0.05) hue_range = (-0.015, 0.015) max_crop_px = 5 max_rotation = 1.0 noise_intensity = 0.02 border_size_range = (0, 3) width, height = image.size current_ratio = width / height target_ratio_value = target_ratio[0] / target_ratio[1] # 不再直接调整尺寸,而是先resize然后进行轻微裁剪 # 第一步:先调整大小,使较短边符合目标尺寸 if current_ratio > target_ratio_value: # 图片较宽 # 先根据高度调整大小 new_height = 1200 # 可以根据需要调整目标尺寸 new_width = int(new_height * current_ratio) else: # 图片较高 # 先根据宽度调整大小 new_width = 900 # 可以根据需要调整目标尺寸 new_height = int(new_width / current_ratio) # 调整尺寸 resized_image = image.resize((new_width, new_height), Image.LANCZOS) # 第二步:计算要裁剪的区域,使最终比例为目标比例 resized_width, resized_height = resized_image.size if resized_width / resized_height > target_ratio_value: # 调整后的图片较宽 # 需要裁剪宽度 crop_width = int(resized_height * target_ratio_value) # 添加微小变化:不完全居中裁剪,而是稍微偏移 if add_variation: max_offset = max(1, min(20, (resized_width - crop_width) // 5)) # 最大偏移量 offset = random.randint(-max_offset, max_offset) else: offset = 0 # 确保裁剪区域在图像内 crop_x1 = (resized_width - crop_width) // 2 + offset crop_x1 = max(0, min(crop_x1, resized_width - crop_width)) crop_x2 = crop_x1 + crop_width result = resized_image.crop((crop_x1, 0, crop_x2, resized_height)) else: # 调整后的图片较高 # 需要裁剪高度 crop_height = int(resized_width / target_ratio_value) # 添加微小变化:不完全居中裁剪,而是稍微偏移 if add_variation: max_offset = max(1, min(20, (resized_height - crop_height) // 5)) # 最大偏移量 offset = random.randint(-max_offset, max_offset) else: offset = 0 # 确保裁剪区域在图像内 crop_y1 = (resized_height - crop_height) // 2 + offset crop_y1 = max(0, min(crop_y1, resized_height - crop_height)) crop_y2 = crop_y1 + crop_height result = resized_image.crop((0, crop_y1, resized_width, crop_y2)) # 如果需要添加微小变化 if add_variation: # 转换为RGB模式进行处理 if result.mode != 'RGB': processed_image = result.convert('RGB') else: processed_image = result.copy() # 1. 微调亮度 brightness_factor = 1.0 + random.uniform(*brightness_range) processed_image = self._adjust_brightness(processed_image, brightness_factor) # 2. 微调对比度 contrast_factor = 1.0 + random.uniform(*contrast_range) processed_image = self._adjust_contrast(processed_image, contrast_factor) # 3. 微调饱和度 saturation_factor = 1.0 + random.uniform(*saturation_range) processed_image = self._adjust_saturation(processed_image, saturation_factor) # 4. 微小裁剪调整 crop_px = random.randint(0, max_crop_px) if crop_px > 0: width, height = processed_image.size processed_image = processed_image.crop((crop_px, crop_px, width-crop_px, height-crop_px)) processed_image = processed_image.resize((width, height), Image.LANCZOS) # 5. 微小旋转 rotation_angle = random.uniform(-max_rotation, max_rotation) processed_image = processed_image.rotate(rotation_angle, resample=Image.BICUBIC, expand=False) # 6. 额外效果 (如果启用) if extra_effects: # 6.1 添加微弱噪点 processed_image = self._add_noise(processed_image, intensity=noise_intensity) # 6.2 微小色相调整 processed_image = self._adjust_hue(processed_image, shift=random.uniform(*hue_range)) # 6.3 随机边缘微调 - 随机添加1-3像素的边缘 border_size = random.randint(*border_size_range) if border_size > 0: processed_image = self._add_border_and_crop(processed_image, border_size) # 6.4 随机进行细微的锐化或模糊处理 if random.random() > 0.5: processed_image = self._slight_sharpen(processed_image) else: processed_image = self._slight_blur(processed_image) # 重置随机种子,避免影响其他操作 if seed is not None: random.seed() return processed_image else: # 重置随机种子,避免影响其他操作 if seed is not None: random.seed() return result def _adjust_hue(self, image: Image.Image, shift: float) -> Image.Image: """调整图像色相""" if shift == 0.0: return image try: # 使用PIL的色相调整 from PIL import ImageEnhance, ImageOps import colorsys # 获取像素数据 data = list(image.getdata()) new_data = [] for pixel in data: r, g, b = pixel[:3] # 转换为HSV h, s, v = colorsys.rgb_to_hsv(r/255, g/255, b/255) # 调整色相 (H 是 0-1 的值) h = (h + shift) % 1.0 # 转回RGB r, g, b = colorsys.hsv_to_rgb(h, s, v) r = int(r * 255) g = int(g * 255) b = int(b * 255) if len(pixel) > 3: # 如果有alpha通道 new_data.append((r, g, b, pixel[3])) else: new_data.append((r, g, b)) result = Image.new(image.mode, image.size) result.putdata(new_data) return result except (ImportError, AttributeError): # 如果无法使用上述方法,返回原图 return image def _add_noise(self, image: Image.Image, intensity: float = 0.02) -> Image.Image: """添加微弱噪点,intensity控制噪点强度(0-1)""" if intensity <= 0: return image # 获取像素数据 data = list(image.getdata()) new_data = [] for pixel in data: r, g, b = pixel[:3] # 添加随机噪点 noise_r = random.randint(-int(intensity * 255), int(intensity * 255)) noise_g = random.randint(-int(intensity * 255), int(intensity * 255)) noise_b = random.randint(-int(intensity * 255), int(intensity * 255)) r = max(0, min(255, r + noise_r)) g = max(0, min(255, g + noise_g)) b = max(0, min(255, b + noise_b)) if len(pixel) > 3: # 如果有alpha通道 new_data.append((r, g, b, pixel[3])) else: new_data.append((r, g, b)) result = Image.new(image.mode, image.size) result.putdata(new_data) return result def _add_border_and_crop(self, image: Image.Image, border_size: int) -> Image.Image: """添加边框然后裁剪回原尺寸,用于改变边缘像素""" if border_size <= 0: return image width, height = image.size # 创建略大的画布 border_color = ( random.randint(0, 10), random.randint(0, 10), random.randint(0, 10) ) bordered = Image.new(image.mode, (width + border_size*2, height + border_size*2), border_color) bordered.paste(image, (border_size, border_size)) # 随机裁剪回原尺寸 offset_x = random.randint(0, border_size*2) offset_y = random.randint(0, border_size*2) result = bordered.crop((offset_x, offset_y, offset_x + width, offset_y + height)) return result def _slight_sharpen(self, image: Image.Image) -> Image.Image: """轻微锐化图像""" try: from PIL import ImageEnhance enhancer = ImageEnhance.Sharpness(image) return enhancer.enhance(1.2) # 轻微锐化,1.0是原始锐度 except (ImportError, AttributeError): return image def _slight_blur(self, image: Image.Image) -> Image.Image: """轻微模糊图像""" try: from PIL import ImageFilter return image.filter(ImageFilter.GaussianBlur(radius=0.5)) except (ImportError, AttributeError): return image def _adjust_brightness(self, image: Image.Image, factor: float) -> Image.Image: """调整图像亮度""" if factor == 1.0: return image data = list(image.getdata()) new_data = [] for pixel in data: r, g, b = pixel[:3] r = min(255, max(0, int(r * factor))) g = min(255, max(0, int(g * factor))) b = min(255, max(0, int(b * factor))) if len(pixel) > 3: # 如果有alpha通道 new_data.append((r, g, b, pixel[3])) else: new_data.append((r, g, b)) result = Image.new(image.mode, image.size) result.putdata(new_data) return result def _adjust_contrast(self, image: Image.Image, factor: float) -> Image.Image: """调整图像对比度""" if factor == 1.0: return image data = list(image.getdata()) new_data = [] # 计算平均亮度 avg_r, avg_g, avg_b = 0, 0, 0 count = 0 for pixel in data: r, g, b = pixel[:3] avg_r += r avg_g += g avg_b += b count += 1 if count > 0: avg_r //= count avg_g //= count avg_b //= count # 调整对比度 for pixel in data: r, g, b = pixel[:3] r = min(255, max(0, int(avg_r + (r - avg_r) * factor))) g = min(255, max(0, int(avg_g + (g - avg_g) * factor))) b = min(255, max(0, int(avg_b + (b - avg_b) * factor))) if len(pixel) > 3: # 如果有alpha通道 new_data.append((r, g, b, pixel[3])) else: new_data.append((r, g, b)) result = Image.new(image.mode, image.size) result.putdata(new_data) return result def _adjust_saturation(self, image: Image.Image, factor: float) -> Image.Image: """调整图像饱和度""" if factor == 1.0: return image # 转换为HSV色彩空间,调整S通道,然后转回RGB try: # 使用内部方法,效率更高 from PIL import ImageEnhance enhancer = ImageEnhance.Color(image) return enhancer.enhance(factor) except ImportError: # 如果PIL没有提供相关功能,使用自定义实现 data = list(image.getdata()) new_data = [] for pixel in data: r, g, b = pixel[:3] # 计算灰度值 gray = (r + g + b) // 3 # 调整饱和度 r = min(255, max(0, int(gray + (r - gray) * factor))) g = min(255, max(0, int(gray + (g - gray) * factor))) b = min(255, max(0, int(gray + (b - gray) * factor))) if len(pixel) > 3: # 如果有alpha通道 new_data.append((r, g, b, pixel[3])) else: new_data.append((r, g, b)) result = Image.new(image.mode, image.size) result.putdata(new_data) return result def process_poster_for_notes( run_id: str, topic_index: int, variant_index: int, poster_image_path: str, poster_metadata_path: str, source_image_dir: str, num_additional_images: int, output_handler: OutputHandler, output_filename_template: str = "note_{index}.jpg" ) -> List[str]: """ 处理海报并创建笔记图像 Args: run_id: 运行ID topic_index: 主题索引 variant_index: 变体索引 poster_image_path: 海报图像路径 poster_metadata_path: 海报元数据路径 source_image_dir: 源图像目录 num_additional_images: 要使用的额外图像数量 output_handler: 输出处理器 output_filename_template: 输出文件名模板 Returns: List[str]: 保存的笔记图像路径列表 """ logger.info(f"开始为海报创建笔记图像: {poster_image_path}") # 验证输入 if not os.path.exists(poster_image_path): logger.error(f"海报图像不存在: {poster_image_path}") return [] # 创建处理器实例并处理 creator = PosterNotesCreator(output_handler) return creator.create_notes_images( run_id, topic_index, variant_index, poster_image_path, poster_metadata_path, source_image_dir, num_additional_images, output_filename_template ) def select_additional_images( run_id: str, topic_index: int, variant_index: int, poster_metadata_path: str, source_image_dir: str, num_additional_images: int, output_handler: OutputHandler, output_filename_template: str = "additional_{index}.jpg", variation_strength: str = "medium", extra_effects: bool = True ) -> List[str]: """ 选择未被海报使用的图像作为额外配图,并处理为3:4比例 Args: run_id: 运行ID topic_index: 主题索引 variant_index: 变体索引 poster_metadata_path: 海报元数据路径 source_image_dir: 源图像目录 num_additional_images: 要选择的额外图像数量 output_handler: 输出处理器 output_filename_template: 输出文件名模板 variation_strength: 微调强度 - "low", "medium", "high" extra_effects: 是否添加额外效果 Returns: List[str]: 保存的额外配图路径列表 """ logger.info(f"开始为主题 {topic_index} 变体 {variant_index} 选择额外配图") # 验证输入 if not os.path.exists(poster_metadata_path): logger.error(f"海报元数据不存在: {poster_metadata_path}") return [] # 创建处理器实例并处理 creator = PosterNotesCreator(output_handler) return creator.create_additional_images( run_id, topic_index, variant_index, poster_metadata_path, source_image_dir, num_additional_images, output_filename_template, variation_strength, extra_effects )