TravelContentCreator/poster_template.py

3422 lines
150 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import time
import logging
import random
import traceback
import simplejson as json
from datetime import datetime
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from core.ai_agent import AI_Agent
import os
import logging
from PIL import Image
import numpy as np
from typing import Tuple, Union, Optional
import psutil
import gc # 添加垃圾回收模块
import os
import random
import traceback
import math
from pathlib import Path
from PIL import Image, ImageDraw, ImageEnhance, ImageFilter, ImageOps
import logging # Import logging module
import os
import random
import logging
import json
from PIL import Image, ImageChops
import traceback
from typing import List, Tuple, Dict, Any, Optional
import concurrent.futures
import numpy as np
from PIL import ImageEnhance, ImageFilter
from .output_handler import OutputHandler
import io
import math
from simple_collage import process_directory as process_collage
# 尝试导入 scipy如果失败则标记
try:
from scipy.fftpack import dct, idct
SCIPY_AVAILABLE = True
except ImportError:
SCIPY_AVAILABLE = False
dct = None
idct = None
logger = logging.getLogger(__name__)
class ContentGenerator:
"""
海报文本内容生成器
使用AI_Agent代替直接管理OpenAI客户端简化代码结构
"""
def __init__(self,
output_dir="/root/autodl-tmp/poster_generate_result",
model_name="qwenQWQ",
base_url="http://localhost:8000/v1",
api_key="EMPTY",
temperature=0.7,
top_p=0.8,
presence_penalty=1.2):
"""
初始化内容生成器
参数:
output_dir: 输出结果保存目录
temperature: 生成温度参数
top_p: top_p参数
presence_penalty: 惩罚参数
"""
self.output_dir = output_dir
self.temperature = temperature
self.top_p = top_p
self.presence_penalty = presence_penalty
self.add_description = ""
self.model_name = model_name
self.base_url = base_url
self.api_key = api_key
# 设置日志
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
self.logger = logging.getLogger(__name__)
def load_infomation(self, info_directory_path):
"""
加载额外描述文件
参数:
info_directory_path: 信息目录路径列表
"""
self.add_description = "" # 重置描述文本
for path in info_directory_path:
try:
with open(path, "r", encoding="utf-8") as f:
self.add_description += f.read()
self.logger.info(f"成功加载描述文件: {path}")
except Exception as e:
self.logger.warning(f"加载描述文件失败: {path}, 错误: {e}")
self.add_description = ""
def split_content(self, content):
"""
分割结果, 返回去除
```json
```的json内容
参数:
content: 需要分割的内容
返回:
分割后的json内容
"""
try:
# 记录原始内容的前200个字符用于调试
self.logger.debug(f"解析内容原始内容前200字符: {content[:200]}")
# 首先尝试直接解析整个内容,以防已经是干净的 JSON
try:
parsed_data = json.loads(content)
# 验证解析后的数据格式
if isinstance(parsed_data, list):
# 如果是列表,验证每个元素是否符合预期结构
for item in parsed_data:
if isinstance(item, dict) and ('main_title' in item or 'texts' in item):
# 至少有一个元素符合海报配置结构
self.logger.info("成功直接解析为JSON格式列表符合预期结构")
return parsed_data
# 如果到这里,说明列表内没有符合结构的元素
if len(parsed_data) > 0 and isinstance(parsed_data[0], str):
self.logger.warning(f"解析到JSON列表但内容是字符串列表: {parsed_data}")
# 将字符串列表返回供后续修复
return parsed_data
self.logger.warning("解析到JSON列表但结构不符合预期")
elif isinstance(parsed_data, dict) and ('main_title' in parsed_data or 'texts' in parsed_data):
# 单个字典结构符合预期
self.logger.info("成功直接解析为JSON字典符合预期结构")
return parsed_data
# 如果结构不符合预期,记录但仍返回解析结果,交给后续函数修复
self.logger.warning(f"解析到JSON但结构不完全符合预期: {parsed_data}")
return parsed_data
except json.JSONDecodeError:
# 不是完整有效的JSON继续尝试提取
self.logger.debug("直接JSON解析失败尝试提取结构化内容")
# 常规模式:查找 ```json 和 ``` 之间的内容
if "```json" in content:
json_str = content.split("```json")[1].split("```")[0].strip()
try:
parsed_json = json.loads(json_str)
self.logger.info("成功从```json```代码块提取JSON")
return parsed_json
except json.JSONDecodeError as e:
self.logger.warning(f"从```json```提取的内容解析失败: {e}, 尝试其他方法")
# 备用模式1查找连续的 [ 开头和 ] 结尾的部分
import re
json_pattern = r'(\[(?:\s*\{.*?\}\s*,?)+\s*\])' # 更严格的模式,要求[]内至少有一个{}对象
json_matches = re.findall(json_pattern, content, re.DOTALL)
for match in json_matches:
try:
result = json.loads(match)
if isinstance(result, list) and len(result) > 0:
# 验证结构
for item in result:
if isinstance(item, dict) and ('main_title' in item or 'texts' in item):
self.logger.info("成功从正则表达式提取JSON数组")
return result
self.logger.warning("从正则表达式提取的JSON数组不符合预期结构")
except Exception as e:
self.logger.warning(f"解析正则匹配的内容失败: {e}")
continue
# 备用模式2查找 [ 开头 和 ] 结尾,并尝试解析
content = content.strip()
square_bracket_start = content.find('[')
square_bracket_end = content.rfind(']')
if square_bracket_start != -1 and square_bracket_end != -1 and square_bracket_end > square_bracket_start:
potential_json = content[square_bracket_start:square_bracket_end + 1]
try:
result = json.loads(potential_json)
if isinstance(result, list):
# 检查列表内容
self.logger.info(f"成功从方括号内容提取列表: {result}")
return result
except Exception as e:
self.logger.warning(f"尝试提取方括号内容失败: {e}")
# 最后一种尝试:查找所有可能的 JSON 结构并尝试解析
json_structures = re.findall(r'({.*?})', content, re.DOTALL)
if json_structures:
items = []
for i, struct in enumerate(json_structures):
try:
item = json.loads(struct)
# 验证结构包含预期字段
if isinstance(item, dict) and ('main_title' in item or 'texts' in item):
items.append(item)
except Exception as e:
self.logger.warning(f"解析可能的JSON结构 {i+1} 失败: {e}")
continue
if items:
self.logger.info(f"成功从文本中提取 {len(items)} 个JSON对象")
return items
# 如果以上所有方法都失败,尝试简单字符串处理
if "|" in content or "必打卡" in content or "性价比" in content:
# 这可能是一个简单的标签字符串
self.logger.warning(f"无法提取标准JSON但发现可能的标签字符串: {content}")
return content.strip()
# 都失败了,打印错误并引发异常
self.logger.error(f"无法解析内容,返回原始文本: {content[:200]}...")
raise ValueError("无法从响应中提取有效的 JSON 格式")
except Exception as e:
self.logger.error(f"解析内容时出错: {e}")
self.logger.debug(f"原始内容: {content[:200]}...") # 仅显示前200个字符
return content.strip() # 返回原始内容,让后续验证函数处理
def _preprocess_for_json(self, text):
"""预处理文本,将换行符转换为\\n形式保证JSON安全"""
if not isinstance(text, str):
return text
# 将所有实际换行符替换为\\n字符串
return text.replace('\n', '\\n').replace('\r', '\\r')
def generate_posters(self, poster_num, content_data_list, system_prompt=None,
api_url=None, model_name=None, api_key=None, timeout=120, max_retries=3):
"""
生成海报配置
Args:
poster_num: 生成的海报数量
content_data_list: 内容数据列表
system_prompt: 系统提示词(可选)
api_url: API基础URL可选
model_name: 模型名称(可选)
api_key: API密钥可选
Returns:
str: 生成的配置JSON字符串
"""
# 更新API设置
if api_url:
self.base_url = api_url
if model_name:
self.model_name = model_name
if api_key:
self.api_key = api_key
# 使用系统提示或默认提示
if system_prompt:
self.system_prompt = system_prompt
elif not self.system_prompt:
self.system_prompt = """你是一名专业的旅游景点海报文案创作专家。你的任务是根据提供的旅游景点信息和推文内容生成海报文案配置。你的回复必须是一个JSON数组每一项表示一个海报配置包含'index''main_title''texts'三个字段,其中'texts'是一个字符串数组。海报文案要简洁有力,突出景点特色和吸引力。"""
# 提取内容文本(如果是列表内容数据)
tweet_content = ""
if isinstance(content_data_list, list):
for item in content_data_list:
if isinstance(item, dict):
# 对标题和内容进行预处理,替换换行符
title = self._preprocess_for_json(item.get('title', ''))
content = self._preprocess_for_json(item.get('content', ''))
tweet_content += f"<title>\n{title}\n</title>\n<content>\n{content}\n</content>\n\n"
elif isinstance(item, str):
tweet_content += self._preprocess_for_json(item) + "\n\n"
elif isinstance(content_data_list, str):
tweet_content = self._preprocess_for_json(content_data_list)
# 构建用户提示
if self.add_description:
# 预处理景点描述
processed_description = self._preprocess_for_json(self.add_description)
user_content = f"""
以下是需要你处理的信息:
关于景点的描述:
{processed_description}
推文内容:
{tweet_content}
请根据这些信息,生成{poster_num}个海报文案配置以JSON数组格式返回。
"""
else:
user_content = f"""
以下是需要你处理的推文内容:
{tweet_content}
请根据这些信息,生成{poster_num}个海报文案配置以JSON数组格式返回。
"""
self.logger.info(f"正在生成{poster_num}个海报文案配置")
# 创建AI_Agent实例
ai_agent = AI_Agent(
self.base_url,
self.model_name,
self.api_key,
timeout=timeout,
max_retries=max_retries,
stream_chunk_timeout=30 # 流式块超时时间
)
full_response = ""
try:
# 使用AI_Agent的non-streaming方法
self.logger.info(f"调用AI生成海报配置模型: {self.model_name}")
full_response, tokens, time_cost = ai_agent.work(
self.system_prompt,
user_content,
"", # 历史消息(空)
self.temperature,
self.top_p,
self.presence_penalty
)
self.logger.info(f"AI生成完成耗时: {time_cost:.2f}s, 预估令牌数: {tokens}")
if not full_response:
self.logger.warning("AI返回空响应使用备用内容")
full_response = self._generate_fallback_content(poster_num)
except Exception as e:
self.logger.exception(f"AI生成过程发生错误: {e}")
full_response = self._generate_fallback_content(poster_num)
finally:
# 确保关闭AI Agent
ai_agent.close()
return full_response
def _generate_fallback_content(self, poster_num):
"""生成备用内容当API调用失败时使用"""
self.logger.info("生成备用内容")
default_configs = []
for i in range(poster_num):
default_configs.append({
"index": i + 1,
"main_title": "",
"texts": ["", ""]
})
return json.dumps(default_configs, ensure_ascii=False)
def save_result(self, full_response, custom_output_dir=None):
"""
保存生成结果到文件
参数:
full_response: 生成的完整响应内容
custom_output_dir: 自定义输出目录(可选)
返回:
结果文件路径
"""
# 生成时间戳
date_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
output_dir = custom_output_dir or self.output_dir
try:
# 解析内容为JSON格式
parsed_data = self.split_content(full_response)
# 验证内容格式并修复
validated_data = self._validate_and_fix_data(parsed_data)
# 创建结果文件路径
result_path = os.path.join(output_dir, f"{date_time}.json")
os.makedirs(os.path.dirname(result_path), exist_ok=True)
# 保存结果到文件
with open(result_path, "w", encoding="utf-8") as f:
json.dump(validated_data, f, ensure_ascii=False, indent=4, ignore_nan=True)
self.logger.info(f"结果已保存到: {result_path}")
return result_path
except Exception as e:
self.logger.error(f"保存结果到文件时出错: {e}")
# 尝试创建一个简单的备用配置
fallback_data = [{"main_title": "", "texts": ["", ""], "index": 1}]
# 保存备用数据
result_path = os.path.join(output_dir, f"{date_time}_fallback.json")
os.makedirs(os.path.dirname(result_path), exist_ok=True)
with open(result_path, "w", encoding="utf-8") as f:
json.dump(fallback_data, f, ensure_ascii=False, indent=4, ignore_nan=True)
self.logger.info(f"出错后已保存备用数据到: {result_path}")
return result_path
def _validate_and_fix_data(self, data):
"""
验证并修复从AI返回的数据确保其符合期望的结构
Args:
data: 需要验证的数据
Returns:
list: 修复后的数据列表
"""
fixed_data = []
self.logger.info(f"验证并修复数据: {type(data)}")
# 尝试处理字符串类型 (通常是JSON字符串)
if isinstance(data, str):
try:
# 尝试将字符串解析为JSON对象
parsed_data = json.loads(data)
# 递归调用本函数处理解析后的数据
return self._validate_and_fix_data(parsed_data)
except json.JSONDecodeError as e:
self.logger.warning(f"JSON解析失败: {e}")
# 可以选择尝试清理和再次解析
try:
# 寻找字符串中第一个 [ 和最后一个 ] 之间的内容
start_idx = data.find('[')
end_idx = data.rfind(']')
if start_idx >= 0 and end_idx > start_idx:
json_part = data[start_idx:end_idx+1]
self.logger.info(f"尝试从字符串中提取JSON部分: {json_part[:100]}...")
parsed_data = json.loads(json_part)
return self._validate_and_fix_data(parsed_data)
except:
self.logger.warning("无法从字符串中提取有效的JSON部分")
fixed_data.append({
"index": 1,
"main_title": self._preprocess_for_json("默认标题"), # 应用预处理
"texts": [self._preprocess_for_json("默认副标题1"), self._preprocess_for_json("默认副标题2")] # 应用预处理
})
# 处理列表类型
elif isinstance(data, list):
for idx, item in enumerate(data):
# 如果是字典,检查必须字段
if isinstance(item, dict):
fixed_item = {}
# 设置索引
fixed_item["index"] = item.get("index", idx + 1)
# 处理主标题
if "main_title" in item and item["main_title"]:
# 应用预处理,确保所有换行符被正确转义
fixed_item["main_title"] = self._preprocess_for_json(item["main_title"])
else:
fixed_item["main_title"] = "默认标题"
# 处理文本列表
if "texts" in item and isinstance(item["texts"], list) and len(item["texts"]) > 0:
# 对文本列表中的每个元素应用预处理
fixed_item["texts"] = [self._preprocess_for_json(text) if text else "" for text in item["texts"]]
# 确保至少有两个元素
while len(fixed_item["texts"]) < 2:
fixed_item["texts"].append("")
else:
fixed_item["texts"] = ["默认副标题1", "默认副标题2"]
fixed_data.append(fixed_item)
# 如果是字符串,转换为默认格式
elif isinstance(item, str):
fixed_data.append({
"index": idx + 1,
"main_title": self._preprocess_for_json(item), # 应用预处理
"texts": ["", ""]
})
# 其他类型,使用默认值
else:
fixed_data.append({
"index": idx + 1,
"main_title": "默认标题",
"texts": ["", ""]
})
# 处理字典类型 (单个配置项)
elif isinstance(data, dict):
# 处理主标题
main_title = self._preprocess_for_json(data.get("main_title", "默认标题")) # 应用预处理
# 处理文本列表
texts = []
if "texts" in data and isinstance(data["texts"], list):
texts = [self._preprocess_for_json(text) if text else "" for text in data["texts"]] # 应用预处理
# 确保文本列表至少有两个元素
while len(texts) < 2:
texts.append("")
fixed_data.append({
"index": data.get("index", 1),
"main_title": main_title,
"texts": texts
})
# 如果数据是其他格式
else:
self.logger.warning(f"数据格式不支持: {type(data)},将使用默认值")
fixed_data.append({
"index": 1,
"main_title": "",
"texts": ["", ""]
})
# 确保至少有一个配置项
if not fixed_data:
fixed_data.append({
"index": 1,
"main_title": "",
"texts": ["", ""]
})
self.logger.info(f"修复后的数据: {fixed_data}")
return fixed_data
def run(self, info_directory, poster_num, content_data, system_prompt=None,
api_url="http://localhost:8000/v1", model_name="qwenQWQ", api_key="EMPTY", timeout=120):
"""
运行海报内容生成流程,并返回生成的配置数据。
参数:
info_directory: 信息目录路径列表 (e.g., ['/path/to/description.txt'])
poster_num: 需要生成的海报配置数量
content_data: 用于生成内容的文章内容(可以是字符串或字典列表)
system_prompt: 系统提示词默认为None使用内置提示词
api_url: API基础URL
model_name: 使用的模型名称
api_key: API密钥
返回:
list | dict | None: 生成的海报配置数据 (通常是列表),如果生成或解析失败则返回 None。
"""
try:
# 加载描述信息
self.load_infomation(info_directory)
# 生成海报内容
self.logger.info(f"开始生成海报内容,数量: {poster_num}")
full_response = self.generate_posters(
poster_num,
content_data,
system_prompt,
api_url,
model_name,
api_key,
timeout=timeout,
)
# 检查生成是否失败
if not isinstance(full_response, str) or not full_response.strip():
self.logger.error("海报内容生成失败或返回空响应")
return None
# 从原始响应字符串中提取JSON数据
result_data = self.split_content(full_response)
# 验证并修复数据
fixed_data = self._validate_and_fix_data(result_data)
self.logger.info(f"成功生成并修复海报配置数据,包含 {len(fixed_data) if isinstance(fixed_data, list) else 1} 个项目")
return fixed_data
except Exception as e:
self.logger.exception(f"海报内容生成过程中发生错误: {e}")
traceback.print_exc()
# 失败后创建一个默认配置
self.logger.info("创建默认海报配置数据")
default_configs = []
for i in range(poster_num):
default_configs.append({
"index": i + 1,
"main_title": "",
"texts": ["", ""]
})
return default_configs
def set_temperature(self, temperature):
"""设置温度参数"""
self.temperature = temperature
def set_top_p(self, top_p):
"""设置top_p参数"""
self.top_p = top_p
def set_presence_penalty(self, presence_penalty):
"""设置存在惩罚参数"""
self.presence_penalty = presence_penalty
def set_model_para(self, temperature, top_p, presence_penalty):
"""一次性设置所有模型参数"""
self.temperature = temperature
self.top_p = top_p
self.presence_penalty = presence_penalty
class ImageProcessor:
"""
图像处理工具类,提供智能图像加载和压缩功能
"""
def __init__(self, max_memory_percent=80, target_max_pixels=4000000):
"""
初始化图像处理器
Args:
max_memory_percent: 最大内存使用百分比,超过此值将更激进地压缩图像
target_max_pixels: 目标最大像素数(宽×高),用于控制图像大小
"""
self.max_memory_percent = max_memory_percent
self.target_max_pixels = target_max_pixels
self.logger = logging.getLogger(self.__class__.__name__)
# 记录已加载的图像,用于后续清理
self.loaded_images = []
def get_memory_usage(self) -> Tuple[float, float]:
"""
获取当前内存使用情况
Returns:
(已用内存百分比, 可用内存MB)
"""
memory = psutil.virtual_memory()
return memory.percent, memory.available / (1024 * 1024)
def check_memory_pressure(self) -> bool:
"""
检查内存压力,如果内存使用率超过阈值,执行垃圾回收
Returns:
是否存在内存压力
"""
mem_percent, available_mb = self.get_memory_usage()
# 如果内存使用率超过阈值或可用内存低于100MB认为存在内存压力
if mem_percent > self.max_memory_percent or available_mb < 100:
self.logger.warning(f"内存压力过大: 使用率 {mem_percent:.1f}%, 可用 {available_mb:.1f}MB执行垃圾回收")
self.force_garbage_collection()
return True
return False
def force_garbage_collection(self):
"""
强制执行垃圾回收并清理已加载的图像资源
"""
# 释放已加载的图像
self.release_loaded_images()
# 执行多次垃圾回收
for _ in range(3):
gc.collect()
# 记录垃圾回收后的内存状态
mem_percent, available_mb = self.get_memory_usage()
self.logger.info(f"垃圾回收后内存状态: 使用率 {mem_percent:.1f}%, 可用 {available_mb:.1f}MB")
def release_loaded_images(self):
"""
释放所有已加载的图像资源
"""
for img in self.loaded_images:
try:
if hasattr(img, 'close') and callable(img.close):
img.close()
except Exception as e:
self.logger.error(f"释放图像资源失败: {e}")
# 清空列表
self.loaded_images = []
self.logger.info("已释放所有已加载的图像资源")
def calculate_resize_factor(self, width: int, height: int) -> float:
"""
根据图像尺寸计算压缩因子
Args:
width: 图像宽度
height: 图像高度
Returns:
压缩因子 (0.1-1.0)
"""
# 计算原始像素数
original_pixels = width * height
# 如果小于目标像素数,不需要压缩
if original_pixels <= self.target_max_pixels:
return 1.0
# 计算基础压缩因子
base_factor = (self.target_max_pixels / original_pixels) ** 0.5
# 检查内存使用情况
mem_percent, available_mb = self.get_memory_usage()
# 根据内存使用情况调整压缩因子
if mem_percent > self.max_memory_percent:
# 内存紧张,增加压缩率
memory_factor = 1.0 - ((mem_percent - self.max_memory_percent) / 20)
memory_factor = max(0.5, memory_factor) # 至少保留50%质量
base_factor *= memory_factor
self.logger.warning(f"内存使用率高 ({mem_percent:.1f}%),增加压缩率,压缩因子调整为 {base_factor:.2f}")
# 确保压缩因子在合理范围内
return max(0.1, min(1.0, base_factor))
def smart_load_image(self, image_path: str, target_size: Optional[Tuple[int, int]] = None) -> Image.Image:
"""
智能加载图像,根据内存情况和图像大小自动调整尺寸
Args:
image_path: 图像路径
target_size: 目标尺寸,如果指定则直接调整到此尺寸
Returns:
加载并调整大小后的PIL图像对象
"""
# 先检查内存压力
self.check_memory_pressure()
try:
# 检查文件是否存在
if not os.path.exists(image_path):
self.logger.error(f"图像文件不存在: {image_path}")
# 返回一个空白图像
return Image.new('RGB', target_size or (800, 600), (240, 240, 240))
# 获取文件大小MB
file_size_mb = os.path.getsize(image_path) / (1024 * 1024)
# 如果文件过大,先检查内存
if file_size_mb > 10: # 大于10MB的文件
mem_percent, available_mb = self.get_memory_usage()
self.logger.info(f"大文件 ({file_size_mb:.1f}MB), 内存使用率: {mem_percent:.1f}%, 可用: {available_mb:.1f}MB")
if mem_percent > 90 or available_mb < file_size_mb * 5:
# 内存紧张,使用更保守的加载方式
self.logger.warning(f"内存不足,使用保守加载方式")
result = self._conservative_load(image_path, target_size)
self.loaded_images.append(result)
return result
# 常规加载
with Image.open(image_path) as img:
# 获取原始尺寸
original_width, original_height = img.size
self.logger.debug(f"原始图像尺寸: {original_width}x{original_height}")
# 如果指定了目标尺寸,直接调整
if target_size:
result = img.resize(target_size, Image.Resampling.LANCZOS)
self.loaded_images.append(result)
return result
# 计算压缩因子
resize_factor = self.calculate_resize_factor(original_width, original_height)
if resize_factor < 1.0:
# 需要压缩
new_width = int(original_width * resize_factor)
new_height = int(original_height * resize_factor)
self.logger.info(f"压缩图像 {image_path}{original_width}x{original_height}{new_width}x{new_height}")
result = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
self.loaded_images.append(result)
return result
else:
# 不需要压缩,返回原图副本
result = img.copy()
self.loaded_images.append(result)
return result
except Exception as e:
self.logger.error(f"加载图像 {image_path} 失败: {e}")
# 返回一个空白图像
return Image.new('RGB', target_size or (800, 600), (240, 240, 240))
def _conservative_load(self, image_path: str, target_size: Optional[Tuple[int, int]] = None) -> Image.Image:
"""
保守加载大图像,先获取尺寸,然后以较低质量加载
Args:
image_path: 图像路径
target_size: 目标尺寸
Returns:
加载的PIL图像对象
"""
try:
# 先只获取图像信息,不加载像素数据
with Image.open(image_path) as img:
original_width, original_height = img.size
format = img.format
# 计算合适的缩小尺寸
if target_size:
new_width, new_height = target_size
else:
# 计算一个非常保守的压缩因子
pixels = original_width * original_height
conservative_factor = min(0.5, (2000000 / pixels) ** 0.5)
new_width = int(original_width * conservative_factor)
new_height = int(original_height * conservative_factor)
# 使用缩略图方式加载
with Image.open(image_path) as img:
img.thumbnail((new_width, new_height), Image.Resampling.LANCZOS)
# 创建新图像以确保释放原始文件句柄
result = img.copy()
self.logger.info(f"保守加载图像 {image_path}{original_width}x{original_height}{new_width}x{new_height}")
return result
except Exception as e:
self.logger.error(f"保守加载图像失败: {e}")
return Image.new('RGB', target_size or (800, 600), (240, 240, 240))
def batch_process_directory(self, directory: str, max_images: int = 100) -> list:
"""
批量处理目录中的图像,返回处理后的图像列表
Args:
directory: 图像目录
max_images: 最大处理图像数量
Returns:
处理后的PIL图像对象列表
"""
if not os.path.exists(directory):
self.logger.error(f"目录不存在: {directory}")
return []
# 获取所有图像文件
image_files = []
for root, _, files in os.walk(directory):
for file in files:
if file.lower().endswith(('.png', '.jpg', '.jpeg', '.webp', '.bmp')):
image_files.append(os.path.join(root, file))
# 限制图像数量
if len(image_files) > max_images:
self.logger.warning(f"图像文件过多 ({len(image_files)}), 限制为 {max_images}")
image_files = image_files[:max_images]
# 批量加载图像
images = []
for image_file in image_files:
img = self.smart_load_image(image_file)
if img:
images.append(img)
# 每处理5张图片检查一次内存压力
if len(images) % 5 == 0:
self.check_memory_pressure()
self.logger.info(f"已处理 {len(images)}/{len(image_files)} 个图像")
return images
def save_optimized_image(self, image: Image.Image, output_path: str, quality: int = 85) -> bool:
"""
保存优化后的图像
Args:
image: PIL图像对象
output_path: 输出路径
quality: JPEG质量 (1-100)
Returns:
是否保存成功
"""
try:
# 确保输出目录存在
os.makedirs(os.path.dirname(os.path.abspath(output_path)), exist_ok=True)
# 检查图像模式如果是RGBA且保存为JPEG先转换为RGB
if image.mode == 'RGBA' and (output_path.lower().endswith('.jpg') or output_path.lower().endswith('.jpeg')):
image = image.convert('RGB')
# 保存图像
image.save(output_path, quality=quality, optimize=True)
self.logger.info(f"图像已保存至: {output_path}")
# 保存后检查内存压力
self.check_memory_pressure()
return True
except Exception as e:
self.logger.error(f"保存图像失败: {e}")
return False
class ImageCollageCreator:
def __init__(self, ):
"""初始化拼图创建器"""
# 定义可用拼接样式
self.collage_styles = [
"grid_2x2", # 标准2x2网格
# "asymmetric", # 非对称布局
# "filmstrip", # 胶片条布局
# "circles", # 圆形布局
"overlap", # 重叠风格
# "mosaic", # 马赛克风格 3x3
"fullscreen", # 全覆盖拼图样式
"vertical_stack" # 新增:上下拼图样式
# "polaroid", # 宝丽来风格
]
def resize_and_crop(self, img, target_size):
"""调整图片大小并居中裁剪为指定尺寸"""
width, height = img.size
target_width, target_height = target_size
# 计算宽高比
img_ratio = width / height
target_ratio = target_width / target_height
if img_ratio > target_ratio:
# 图片较宽,以高度为基准调整
new_height = target_height
new_width = int(width * target_height / height)
img = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
# 居中裁剪
left = (new_width - target_width) // 2
img = img.crop((left, 0, left + target_width, target_height))
else:
# 图片较高,以宽度为基准调整
new_width = target_width
new_height = int(height * target_width / width)
img = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
# 居中裁剪
top = (new_height - target_height) // 2
img = img.crop((0, top, target_width, top + target_height))
return img
def add_border(self, img, color=(255, 255, 255, 200), width=2, no_border=True):
"""给图像添加边框,可选择不添加边框"""
if no_border:
return img # 如果设置为无边框,直接返回原图
try:
w, h = img.size
new_img = Image.new('RGBA', (w, h), (0, 0, 0, 0))
draw = ImageDraw.Draw(new_img)
# 绘制边框(在四条边上)
for i in range(width):
# 上边框
draw.line([(i, i), (w-i-1, i)], fill=color, width=1)
# 右边框
draw.line([(w-i-1, i), (w-i-1, h-i-1)], fill=color, width=1)
# 下边框
draw.line([(i, h-i-1), (w-i-1, h-i-1)], fill=color, width=1)
# 左边框
draw.line([(i, i), (i, h-i-1)], fill=color, width=1)
# 合并原图和边框
result = img.copy()
result.alpha_composite(new_img)
return result
except Exception as e:
print(f"添加边框时出错: {str(e)}")
return img
def add_polaroid_frame(self, img, margin=20, bottom_margin=60, background_color=(255, 255, 255, 255)):
"""添加宝丽来风格的相框"""
try:
w, h = img.size
frame_width = w + 2 * margin
frame_height = h + margin + bottom_margin
# 创建白色背景
frame = Image.new('RGBA', (frame_width, frame_height), background_color)
# 将图像粘贴到框架中
frame.paste(img, (margin, margin))
# 添加稍微的阴影效果
shadow = Image.new('RGBA', frame.size, (0, 0, 0, 0))
shadow_draw = ImageDraw.Draw(shadow)
shadow_draw.rectangle([2, 2, frame_width-2, frame_height-2], fill=(0, 0, 0, 40))
# 模糊阴影
shadow = shadow.filter(ImageFilter.GaussianBlur(3))
# 创建最终图像
final = Image.new('RGBA', (frame_width+6, frame_height+6), (0, 0, 0, 0))
final.paste(shadow, (6, 6))
final.paste(frame, (0, 0), frame)
return final
except Exception as e:
print(f"添加宝丽来相框时出错: {str(e)}")
return img
def apply_image_effect(self, img, effect="none"):
"""应用各种图像效果 - 所有图片适度增强对比度和亮度"""
try:
# 适度增强对比度
contrast = ImageEnhance.Contrast(img)
enhanced = contrast.enhance(1.1) # 降低对比度系数从1.6降至1.3
# 轻微增强亮度
brightness = ImageEnhance.Brightness(enhanced)
enhanced = brightness.enhance(1.1) # 保持轻微增加亮度
# 轻微增强色彩饱和度
color = ImageEnhance.Color(enhanced)
enhanced = color.enhance(1.15) # 轻微降低饱和度从1.2降至1.15
return enhanced
except Exception as e:
print(f"增强图片效果时出错: {str(e)}")
return img
def create_collage_with_style(self, input_dir, style=None, target_size=None):
"""创建指定样式的拼接画布
参数:
input_dir: 输入图片目录路径
style: 拼贴样式,如不指定则随机选择
target_size: 目标尺寸,默认为(900, 1200)
返回:
tuple: (拼贴图, 选择的图片名称列表),如果创建失败则返回(None, [])
"""
logging.info(f"--- Starting Collage Creation for Directory: {input_dir} ---") # Start Log
try:
# 设置默认尺寸为3:4比例
if target_size is None:
target_size = (900, 1200) # 3:4比例
# 如果没有指定样式,随机选择一种
if style is None or style not in self.collage_styles:
style = random.choice(self.collage_styles)
logging.info(f"Using collage style: {style} with target size: {target_size}")
# 检查目录是否存在
if not os.path.exists(input_dir):
logging.error(f"Input directory does not exist: {input_dir}")
return None, []
# 支持的图片格式
image_extensions = ('.jpg', '.jpeg', '.png', '.bmp')
# 获取目录中的所有文件
try:
all_files = os.listdir(input_dir)
logging.info(f"Files found in directory: {all_files}")
except Exception as e:
logging.exception(f"Error listing directory {input_dir}: {e}")
return None, []
# 过滤图片文件
all_images_names = [f for f in all_files
if f.lower().endswith(image_extensions) and os.path.isfile(os.path.join(input_dir, f))]
logging.info(f"Filtered image files: {all_images_names}")
if not all_images_names:
logging.warning(f"No valid image files found in directory: {input_dir}")
return None, [] # Return None if no images found
# 根据不同样式,确定需要的图片数量
# ... (logic for num_images based on style) ...
num_images = 4
if style == "mosaic":
num_images = 9
elif style == "filmstrip":
num_images = 5
elif style == "fullscreen":
num_images = 6
elif style == "vertical_stack":
num_images = 2
logging.info(f"Style '{style}' requires {num_images} images.")
# 确保有足够的图像 (或重复使用)
selected_images_names = []
if len(all_images_names) < num_images:
logging.warning(f"Need {num_images} images for style '{style}', but only found {len(all_images_names)}. Will repeat images.")
if len(all_images_names) > 0:
# Repeat available images to meet the count
selected_images_names = (all_images_names * (num_images // len(all_images_names) + 1))[:num_images]
else:
logging.error("Cannot select images, none were found.") # Should not happen due to earlier check
return None, []
else:
# 随机选择指定数量的图片
selected_images_names = random.sample(all_images_names, num_images)
# 记录并输出被选择的图片名称
logging.info(f"Selected images for collage: {selected_images_names}")
print(f"为拼贴图选择的图片: {selected_images_names}")
# 加载选中的图片
images = []
for img_name in selected_images_names:
image_path = os.path.join(input_dir, img_name)
try:
img = Image.open(image_path).convert('RGBA')
images.append(img)
logging.debug(f"Successfully loaded image: {img_name}")
except Exception as e:
logging.exception(f"Error loading image {img_name}: {e}")
if len(images) == 0:
logging.error("No images could be loaded. Cannot create collage.")
return None, []
# 确保图片数量满足要求,不足则复制已有图片
while len(images) < num_images:
images.append(random.choice(images).copy())
logging.debug(f"Duplicated an image to reach required count of {num_images}")
# 根据样式创建拼图
collage = None
if style == "grid_2x2":
collage = self._create_grid_2x2_collage(images, target_size)
elif style == "asymmetric":
collage = self._create_asymmetric_collage(images, target_size)
elif style == "filmstrip":
collage = self._create_filmstrip_collage(images, target_size)
elif style == "circles":
collage = self._create_circles_collage(images, target_size)
elif style == "overlap":
collage = self._create_overlap_collage(images, target_size)
elif style == "polaroid":
collage = self._create_polaroid_collage(images, target_size)
elif style == "mosaic":
collage = self._create_mosaic_collage(images, target_size)
elif style == "fullscreen":
collage = self._create_fullscreen_collage(images, target_size)
elif style == "vertical_stack":
collage = self._create_vertical_stack_collage(images, target_size)
if collage:
logging.info(f"Successfully created collage with style: {style}")
else:
logging.error(f"Failed to create collage with style: {style}")
return None, []
# 清理内存中的原始图像
for img in images:
if hasattr(img, 'close'):
img.close()
return collage, selected_images_names
except Exception as e:
logging.exception(f"Error in create_collage_with_style: {e}")
traceback.print_exc()
return None, []
def _create_grid_2x2_collage(self, images, target_size):
"""创建2x2网格拼贴画"""
collage = Image.new('RGBA', target_size, (255, 255, 255, 255)) # 使用白色背景
# 计算每个块的大小
block_width = target_size[0] // 2
block_height = target_size[1] // 2
# 定义四个区域位置
positions = [
(0, 0), # 左上
(block_width, 0), # 右上
(0, block_height), # 左下
(block_width, block_height) # 右下
]
# 将图像粘贴到拼贴画位置
for i, position in enumerate(positions):
if i < len(images):
img = images[i].copy()
# 调整大小
img = self.resize_and_crop(img, (block_width, block_height))
# 不应用边框,实现无缝拼接
# 粘贴到拼贴画
collage.paste(img, position, img)
print(f"添加拼贴画块 {i+1} 到位置: {position}")
print(f"无缝2x2网格拼贴画创建成功尺寸: {target_size}")
return collage
def _create_asymmetric_collage(self, images, target_size):
"""创建非对称布局拼贴画"""
collage = Image.new('RGBA', target_size, (255, 255, 255, 255))
width, height = target_size
# 定义非对称区域位置
positions = [
(0, 0, width*2//3, height//2), # 左上 (大)
(width*2//3, 0, width, height//3), # 右上
(width*2//3, height//3, width, height//2), # 右中
(0, height//2, width, height) # 底部 (全宽)
]
# 定义不同的效果
effects = ["none", "grayscale", "vintage", "color_boost"]
random.shuffle(effects)
# 将图像粘贴到拼贴画位置
for i, (x1, y1, x2, y2) in enumerate(positions):
if i < len(images):
img = images[i].copy()
# 调整大小
img = self.resize_and_crop(img, (x2-x1, y2-y1))
# 应用效果
img = self.apply_image_effect(img, effects[i % len(effects)])
# 不添加边框
# 粘贴到拼贴画
collage.paste(img, (x1, y1), img)
print(f"添加非对称拼贴画块 {i+1} 到位置: ({x1},{y1},{x2},{y2})")
print(f"无缝非对称拼贴画创建成功,尺寸: {target_size}")
return collage
def _create_filmstrip_collage(self, images, target_size):
"""创建胶片条布局拼贴画"""
collage = Image.new('RGBA', target_size, (0, 0, 0, 0))
width, height = target_size
# 胶片条中每个图像的高度
strip_height = height // 5
# 添加黑条边框
film_border_width = 15
# 将图像粘贴为胶片条
for i in range(5):
if i < len(images):
img = images[i].copy()
# 调整大小,考虑边框
img = self.resize_and_crop(img, (width - 2*film_border_width, strip_height - 2*film_border_width))
# 不应用效果,保持原始颜色
# 创建黑色胶片边框
film_frame = Image.new('RGBA', (width, strip_height), (0, 0, 0, 255))
# 在边框中间贴上图片
film_frame.paste(img, (film_border_width, film_border_width), img)
# 添加胶片冲孔
draw = ImageDraw.Draw(film_frame)
hole_spacing = 30
hole_radius = 5
num_holes = width // hole_spacing
for h in range(num_holes):
hole_center_x = h * hole_spacing + hole_spacing // 2
# 顶部和底部的冲孔
draw.ellipse((hole_center_x - hole_radius, 3, hole_center_x + hole_radius, 13), fill=(50, 50, 50, 255))
draw.ellipse((hole_center_x - hole_radius, strip_height - 13, hole_center_x + hole_radius, strip_height - 3), fill=(50, 50, 50, 255))
# 粘贴到拼贴画
y_position = i * strip_height
collage.paste(film_frame, (0, y_position), film_frame)
print(f"添加胶片条拼贴画块 {i+1} 到位置 y={y_position}")
print(f"胶片条拼贴画创建成功,尺寸: {target_size}")
return collage
def _create_circles_collage(self, images, target_size):
"""创建圆形布局拼贴画"""
collage = Image.new('RGBA', target_size, (0, 0, 0, 0))
width, height = target_size
# 定义圆形的位置和大小
circle_positions = [
(width//4, height//4, width//2.5), # 左上
(width*3//4, height//4, width//3), # 右上
(width//4, height*3//4, width//3), # 左下
(width*3//4, height*3//4, width//2.5) # 右下
]
# 为每个圆形创建蒙版
for i, (center_x, center_y, radius) in enumerate(circle_positions):
if i < len(images):
img = images[i].copy()
# 应用效果
img = self.apply_image_effect(img)
# 调整图像大小为圆的直径 - 确保是整数
diam = int(radius*2)
img = self.resize_and_crop(img, (diam, diam))
# 创建圆形蒙版
mask = Image.new('L', img.size, 0)
draw = ImageDraw.Draw(mask)
draw.ellipse((0, 0, mask.width, mask.height), fill=255)
# 模糊边缘
mask = mask.filter(ImageFilter.GaussianBlur(radius=5))
# 应用蒙版
img.putalpha(mask)
# 计算粘贴位置,使圆心在定义的位置
paste_x = int(center_x - radius)
paste_y = int(center_y - radius)
# 粘贴到拼贴画
collage.paste(img, (paste_x, paste_y), img)
print(f"添加圆形拼贴画块 {i+1} 到位置: ({paste_x},{paste_y})")
# 添加轻微的渐变背景
background = Image.new('RGBA', target_size, (245, 245, 245, 100))
collage = Image.alpha_composite(background, collage)
print(f"圆形拼贴画创建成功,尺寸: {target_size}")
return collage
def _create_polaroid_collage(self, images, target_size):
"""创建宝丽来风格拼贴画 - 最小化图片重叠"""
collage = Image.new('RGBA', target_size, (240, 240, 240, 255))
width, height = target_size
# 宝丽来照片的大小 - 适当调整尺寸,减少重叠
polaroid_sizes = [
(int(width//2.2), int(height//2.8)), # 大号
(int(width//2.5), int(height//3)), # 中大号
(int(width//2.8), int(height//3.5)), # 中号
(int(width//3), int(height//4)) # 中小号
]
# 随机打乱尺寸
random.shuffle(polaroid_sizes)
# 创建网格布局,降低重叠概率
grid_cells = [
(0, 0, width//2, height//2), # 左上
(width//2, 0, width, height//2), # 右上
(0, height//2, width//2, height), # 左下
(width//2, height//2, width, height) # 右下
]
# 随机打乱网格单元
random.shuffle(grid_cells)
# 用于记录已放置的区域
placed_areas = []
for i, img_size in enumerate(polaroid_sizes):
if i < len(images) and i < len(grid_cells):
img = images[i].copy()
# 调整大小
img = self.resize_and_crop(img, img_size)
# 应用效果
img = self.apply_image_effect(img)
# 添加宝丽来相框
img = self.add_polaroid_frame(img)
# 轻微旋转(-3到3度之间进一步减小旋转角度
rotation = random.uniform(-3, 3)
img = img.rotate(rotation, expand=True, resample=Image.Resampling.BICUBIC)
# 从当前网格单元获取可用区域
cell = grid_cells[i]
cell_x1, cell_y1, cell_x2, cell_y2 = cell
# 确保照片至少有80%在当前网格单元内
cell_width = cell_x2 - cell_x1
cell_height = cell_y2 - cell_y1
# 计算可用的粘贴位置范围
min_x = max(10, cell_x1 - img.width * 0.2) # 允许20%超出左边
max_x = min(width - img.width - 10, cell_x2 - img.width * 0.8) # 确保至少80%在单元内
min_y = max(10, cell_y1 - img.height * 0.2) # 允许20%超出上边
max_y = min(height - img.height - 10, cell_y2 - img.height * 0.8) # 确保至少80%在单元内
# 确保坐标范围有效,如果无效则使用单元中心
if min_x >= max_x:
center_x = (cell_x1 + cell_x2) // 2
min_x = max(10, center_x - img.width // 2)
max_x = min_x + 1
if min_y >= max_y:
center_y = (cell_y1 + cell_y2) // 2
min_y = max(10, center_y - img.height // 2)
max_y = min_y + 1
# 在可用范围内随机选择位置
paste_x = random.randint(int(min_x), int(max_x))
paste_y = random.randint(int(min_y), int(max_y))
# 记录这个位置
placed_areas.append((paste_x, paste_y, paste_x + img.width, paste_y + img.height))
# 粘贴到拼贴画
collage.paste(img, (paste_x, paste_y), img)
print(f"添加宝丽来风格块 {i+1} 到位置: ({paste_x},{paste_y}),尺寸: {img.size},单元: {cell}")
print(f"宝丽来风格拼贴画创建成功,尺寸: {target_size}")
return collage
def _create_overlap_collage(self, images, target_size):
"""创建无重叠风格拼贴画(不允许图片重叠)"""
collage = Image.new('RGBA', target_size, (255, 255, 255, 255))
width, height = target_size
# 为了避免重叠,计算每个图像区域的大小
grid_size = 2 # 2x2网格
img_width = width // grid_size
img_height = height // grid_size
# 定义网格位置 - 确保无重叠
positions = [
(0, 0), # 左上
(img_width, 0), # 右上
(0, img_height), # 左下
(img_width, img_height) # 右下
]
# 添加图片到位置
for i, position in enumerate(positions):
if i < len(images):
img = images[i].copy()
# 调整大小
img = self.resize_and_crop(img, (img_width, img_height))
# 应用效果
img = self.apply_image_effect(img)
# 粘贴到拼贴画
collage.paste(img, position)
print(f"添加无重叠拼贴画块 {i+1} 到位置: {position},尺寸: {img_width}x{img_height}")
print(f"无重叠拼贴画创建成功,尺寸: {target_size}")
return collage
def _create_mosaic_collage(self, images, target_size):
"""创建马赛克风格拼贴画需要9张图片无重叠"""
collage = Image.new('RGBA', target_size, (255, 255, 255, 255))
width, height = target_size
# 创建3x3网格确保无重叠
grid_width = width // 3
grid_height = height // 3
# 生成网格位置
positions = []
for row in range(3):
for col in range(3):
positions.append((col * grid_width, row * grid_height))
# 将图像粘贴到马赛克位置
for i, position in enumerate(positions):
if i < len(images):
img = images[i].copy()
# 调整大小
img = self.resize_and_crop(img, (grid_width, grid_height))
# 应用效果
img = self.apply_image_effect(img)
# 粘贴到拼贴画
collage.paste(img, position)
print(f"添加马赛克拼贴画块 {i+1} 到位置: {position},尺寸: {grid_width}x{grid_height}")
print(f"无重叠马赛克拼贴画创建成功,尺寸: {target_size}")
return collage
def _create_fullscreen_collage(self, images, target_size):
"""创建全覆盖拼图样式(完全填满画布,无空白,无重叠)"""
width, height = target_size
collage = Image.new('RGBA', target_size, (255, 255, 255, 255)) # 白色背景
# 确保至少有6张图片
while len(images) < 6:
if images:
images.append(random.choice(images).copy())
else:
return None
# 定义区域划分 - 按照完全填满画布设计,确保无重叠
regions = [
# 左列 - 上中下三块
(0, 0, width//2, height//3), # 左上
(0, height//3, width//2, height*2//3), # 左中
(0, height*2//3, width//2, height), # 左下
# 右列 - 上中下三块
(width//2, 0, width, height//3), # 右上
(width//2, height//3, width, height*2//3), # 右中
(width//2, height*2//3, width, height) # 右下
]
# 添加图片到各个区域,确保完全覆盖无重叠
for i, (x1, y1, x2, y2) in enumerate(regions):
if i < len(images):
img = images[i].copy()
# 调整大小以完全填充区域
region_width = x2 - x1
region_height = y2 - y1
img = self.resize_and_crop(img, (region_width, region_height))
# 应用轻微的图像效果
img = self.apply_image_effect(img)
# 粘贴到画布上
collage.paste(img, (x1, y1))
print(f"添加全屏拼图块 {i+1} 到位置: ({x1}, {y1}, {x2}, {y2}),尺寸: {region_width}x{region_height}")
print(f"无重叠全覆盖拼图创建成功,尺寸: {target_size}")
return collage
def _create_vertical_stack_collage(self, images, target_size):
"""创建上下拼图样式(两张图片上下排列)"""
collage = Image.new('RGBA', target_size, (255, 255, 255, 255)) # 白色背景
width, height = target_size
# 确保至少有2张图片
while len(images) < 2:
if images:
images.append(images[0].copy())
else:
print("没有可用的图片来创建上下拼图")
return None
# 设置间隙(可选)
gap = 0 # 无间隙拼接设置为0
# 计算每张图片的高度
img_height = (height - gap) // 2
# 定义图片位置
positions = [
(0, 0), # 上方图片
(0, img_height + gap) # 下方图片
]
# 添加图片
for i, position in enumerate(positions):
if i < len(images) and i < 2: # 只使用前两张图片
img = images[i].copy()
# 调整大小以适应宽度
img = self.resize_and_crop(img, (width, img_height))
# 应用轻微的图像效果
img = self.apply_image_effect(img)
# 粘贴到画布上
collage.paste(img, position, img)
print(f"添加上下拼图块 {i+1} 到位置: {position}")
# 可选:添加分隔线
if gap > 0:
draw = ImageDraw.Draw(collage)
line_y = img_height + gap // 2
draw.line([(0, line_y), (width, line_y)], fill=(200, 200, 200, 255), width=gap)
print(f"上下拼图创建成功,尺寸: {target_size}")
return collage
def save_collage(self, collage, output_path):
"""保存拼贴画"""
if collage:
# 确保有背景 - 创建白色背景并将拼贴画合并上去
background = Image.new('RGB', collage.size, (255, 255, 255))
# 如果拼贴画有透明通道,将其合并到白色背景上
if collage.mode == 'RGBA':
background.paste(collage, (0, 0), collage)
final_image = background
else:
final_image = collage.convert('RGB')
final_image.save(output_path)
print(f"无缝拼贴画已保存: {output_path}")
return output_path
return None
def set_collage_style(self, collage_style):
"""设置拼贴画样式"""
self.collage_style = collage_style
return self.collage_style
class PosterNotesCreator:
"""
处理原始海报作为主图,并随机选择额外的图片作为笔记图片。
确保选择的笔记图片与海报中使用的图片不重复。
"""
def __init__(self, output_handler: OutputHandler):
"""
初始化 PosterNotesCreator
Args:
output_handler: 可选的 OutputHandler 实例,用于处理输出
"""
self.output_handler = output_handler
logging.info("PosterNotesCreator 初始化完成")
def create_notes_images(
self,
run_id: str,
topic_index: int,
variant_index: int,
poster_image_path: str,
poster_metadata_path: str,
source_image_dir: str,
num_additional_images: int,
output_filename_template: str = "note_{index}.jpg"
) -> List[str]:
"""
创建笔记图像
Args:
run_id: 运行ID
topic_index: 主题索引
variant_index: 变体索引
poster_image_path: 海报图像路径
poster_metadata_path: 海报元数据路径
source_image_dir: 源图像目录
num_additional_images: 要使用的额外图像数量
output_filename_template: 输出文件名模板
Returns:
List[str]: 保存的笔记图像路径列表
"""
# 检查输入路径是否存在
if not os.path.exists(poster_image_path):
logger.error(f"海报图像不存在: {poster_image_path}")
return []
if not os.path.exists(poster_metadata_path):
logger.error(f"海报元数据不存在: {poster_metadata_path}")
return []
if not os.path.exists(source_image_dir) or not os.path.isdir(source_image_dir):
logger.error(f"源图像目录不存在: {source_image_dir}")
return []
# 从元数据文件中读取已使用的图像信息
try:
with open(poster_metadata_path, 'r', encoding='utf-8') as f:
poster_metadata = json.load(f)
except Exception as e:
logger.error(f"无法读取海报元数据: {e}")
return []
# 获取已经在海报中使用的图像
used_images = []
if 'collage_images' in poster_metadata:
used_images = poster_metadata['collage_images']
logger.info(f"海报中已使用 {len(used_images)} 张图像: {', '.join(used_images)}")
# 列出源目录中的所有图像文件
image_extensions = ('.jpg', '.jpeg', '.png', '.bmp')
available_images = [
f for f in os.listdir(source_image_dir)
if os.path.isfile(os.path.join(source_image_dir, f)) and
f.lower().endswith(image_extensions)
]
if not available_images:
logger.error(f"源目录中没有找到图像: {source_image_dir}")
return []
logger.info(f"源目录中找到 {len(available_images)} 张图像")
# 过滤掉已经在海报中使用的图像
available_images = [img for img in available_images if img not in used_images]
if not available_images:
logger.warning("所有图像都已在海报中使用,无法创建额外笔记")
return []
logger.info(f"过滤后可用图像数量: {len(available_images)}")
# 如果可用图像少于请求数量,进行警告但继续处理
if len(available_images) < num_additional_images:
logger.warning(
f"可用图像数量 ({len(available_images)}) 少于请求的笔记数量 ({num_additional_images})"
f"将使用所有可用图像"
)
selected_images = available_images
else:
# 随机选择额外图像
selected_images = random.sample(available_images, num_additional_images)
logger.info(f"已选择 {len(selected_images)} 张图像作为笔记")
# 保存选择的笔记图像
saved_paths = []
for i, image_filename in enumerate(selected_images):
try:
# 加载图像
image_path = os.path.join(source_image_dir, image_filename)
image = Image.open(image_path)
# 生成输出文件名
output_filename = output_filename_template.format(index=i+1)
# 创建元数据
note_metadata = {
"original_image": image_filename,
"note_index": i + 1,
"source_dir": source_image_dir,
"associated_poster": os.path.basename(poster_image_path)
}
# 使用输出处理器保存图像
saved_path = self.output_handler.handle_generated_image(
run_id,
topic_index,
variant_index,
'note', # 图像类型为note
image,
output_filename,
note_metadata
)
saved_paths.append(saved_path)
logger.info(f"已保存笔记图像 {i+1}/{len(selected_images)}: {saved_path}")
except Exception as e:
logger.error(f"处理图像时出错 '{image_filename}': {e}")
return saved_paths
def create_additional_images(
self,
run_id: str,
topic_index: int,
variant_index: int,
poster_metadata_path: str,
source_image_dir: str,
num_additional_images: int = 3,
output_filename_template: str = "additional_{index}.jpg",
variation_strength: str = "medium",
extra_effects: bool = True,
collage_style: str = "grid_2x2" # 默认使用grid模式
) -> List[str]:
"""
选择未被海报使用的图像作为额外配图使用2x2网格拼接多张图片
Args:
run_id: 运行ID
topic_index: 主题索引
variant_index: 变体索引
poster_metadata_path: 海报元数据路径
source_image_dir: 源图像目录
num_additional_images: 要输出的额外配图数量默认为3
output_filename_template: 输出文件名模板
variation_strength: 变化强度,可以是 'low', 'medium', 'high'
extra_effects: 是否应用额外效果
collage_style: 拼图风格,固定为 'grid'
Returns:
List[str]: 保存的图像路径列表
"""
logger.info(f"开始为主题 {topic_index} 变体 {variant_index} 选择额外配图")
# 获取候选图像 - 我们需要至少4*num_additional_images张图片
# num_source_images_needed = min(4 * num_additional_images, 12) # 限制最多12张源图 <-- 旧逻辑
total_images_needed = 4 * num_additional_images # 计算总共需要的唯一图片数量
candidate_images = self.get_candidate_images(
poster_metadata_path,
source_image_dir,
# num_source_images_needed <-- 旧逻辑
total_images_needed # 请求总共需要的图片数量
)
if not candidate_images:
logger.warning("没有找到合适的候选图像")
return []
# 检查是否有足够的图片来生成请求数量的配图
if len(candidate_images) < total_images_needed:
adjusted_num_images = len(candidate_images) // 4
logger.warning(
f"可用图像数量 ({len(candidate_images)}) 不足以生成 {num_additional_images} 张不重复的2x2配图 "
f"(需要 {total_images_needed} 张)。将只生成 {adjusted_num_images} 张配图。"
)
num_additional_images = adjusted_num_images
if num_additional_images == 0:
logger.warning("可用图像数量少于4张无法创建任何2x2拼图。")
return []
elif len(candidate_images) < 4: # 即使调整后检查是否仍少于4张
logger.warning(f"可用图像数量({len(candidate_images)})少于4张无法创建2x2拼图")
return []
# 生成唯一的随机种子
seed_str = f"{run_id}_{topic_index}_{variant_index}"
seed = sum(ord(c) for c in seed_str)
random.seed(seed)
logger.info(f"使用随机种子: {seed},基于: {seed_str}")
# 打乱候选图像顺序
random.shuffle(candidate_images)
# 使用多进程并行处理图像
saved_paths = []
with concurrent.futures.ProcessPoolExecutor(max_workers=min(4, num_additional_images)) as executor:
# 创建任务
future_to_image_set = {}
start_index = 0 # 用于追踪从candidate_images中取图的起始位置
for i in range(num_additional_images):
# # 为每个输出选择4张不同的图片 <-- 旧逻辑,改为切片
# selected_indices = []
# # 确保我们有足够的图片可选择
# available_indices = list(range(len(candidate_images)))
# # 如果图片不够,我们可能需要重复使用一些图片
# if len(available_indices) < 4:
# selected_indices = available_indices * (4 // len(available_indices) + 1)
# selected_indices = selected_indices[:4]
# else:
# # 随机选择4个不同的索引
# selected_indices = random.sample(available_indices, 4)
# # 获取对应的图片文件名
# selected_images = [candidate_images[idx] for idx in selected_indices]
# --- 新逻辑:从打乱后的列表中顺序切片获取不重复的图像 ---
end_index = start_index + 4
if end_index > len(candidate_images): # 双重检查,理论上不应发生
logger.error(f"内部错误:尝试获取的图像索引超出范围 ({start_index}-{end_index}),可用图像: {len(candidate_images)}")
break
selected_images = candidate_images[start_index:end_index]
start_index = end_index # 更新下一个起始索引
# --- 结束新逻辑 ---
# 为每个拼图创建单独的种子
image_seed = seed + i
future = executor.submit(
self.process_multiple_images,
run_id,
topic_index,
variant_index,
source_image_dir,
selected_images,
i,
output_filename_template.format(index=i+1),
image_seed,
variation_strength,
extra_effects,
collage_style
)
future_to_image_set[future] = (i, selected_images)
# 收集结果
for future in concurrent.futures.as_completed(future_to_image_set):
i, selected_images = future_to_image_set[future]
try:
saved_path = future.result()
if saved_path:
saved_paths.append(saved_path)
logger.info(f"已保存额外配图 {i+1}/{num_additional_images}: {saved_path}")
except Exception as e:
logger.error(f"处理图像时出错 '{', '.join(selected_images)}': {e}")
logger.error(traceback.format_exc())
# 重置随机种子
random.seed()
return saved_paths
def process_multiple_images(
self,
run_id,
topic_index,
variant_index,
source_dir,
image_filenames,
index,
output_filename,
seed,
variation_strength,
extra_effects,
collage_style="grid_2x2"
):
"""处理多张图像创建2x2网格拼图"""
try:
# 使用core.simple_collage模块处理图像
style = "grid_2x2" # 固定使用grid风格
# 创建临时目录来存放图像以便传递给process_collage函数
import tempfile
import shutil
with tempfile.TemporaryDirectory() as temp_dir:
# 复制选中的图像到临时目录
temp_image_paths = []
for img_filename in image_filenames:
src_path = os.path.join(source_dir, img_filename)
dst_path = os.path.join(temp_dir, img_filename)
shutil.copy2(src_path, dst_path)
temp_image_paths.append(dst_path)
logger.info(f"为网格拼图准备了 {len(temp_image_paths)} 张图像: {', '.join(image_filenames)}")
# 设置随机种子以确保结果一致性
if seed is not None:
random.seed(seed)
np.random.seed(seed)
# 调用core.simple_collage模块处理图像
target_size = (900, 1200) # 3:4比例
collage_images, used_image_filenames = process_collage(
temp_dir,
style=style,
target_size=target_size,
output_count=1,
)
# 重置随机种子
if seed is not None:
random.seed()
np.random.seed()
if not collage_images or len(collage_images) == 0:
logger.error(f"拼图模块没有生成有效的图像")
return None
processed_image = collage_images[0]
# 确保图像是RGB模式解决"cannot write mode RGBA as JPEG"错误
if processed_image.mode == 'RGBA':
logger.debug(f"将RGBA图像转换为RGB模式")
# 创建白色背景并粘贴RGBA图像
background = Image.new('RGB', processed_image.size, (255, 255, 255))
background.paste(processed_image, mask=processed_image.split()[3]) # 使用alpha通道作为mask
processed_image = background
elif processed_image.mode != 'RGB':
logger.debug(f"{processed_image.mode}图像转换为RGB模式")
processed_image = processed_image.convert('RGB')
# 创建元数据
additional_metadata = {
"original_images": image_filenames,
"additional_index": index + 1,
"source_dir": source_dir,
"is_additional_image": True,
"processed": True,
"aspect_ratio": "3:4",
"collage_style": style,
"grid_size": "2x2"
}
# 使用输出处理器保存图像
return self.output_handler.handle_generated_image(
run_id,
topic_index,
variant_index,
'additional', # 图像类型为additional
processed_image,
output_filename,
additional_metadata
)
except Exception as e:
logger.error(f"处理多张图像时出错: {e}")
logger.error(traceback.format_exc())
return None
def get_candidate_images(self, poster_metadata_path, source_image_dir, num_images):
"""获取候选图像列表,排除已用于海报的图像"""
# 检查输入路径是否存在
if not os.path.exists(poster_metadata_path):
logger.error(f"海报元数据不存在: {poster_metadata_path}")
return []
if not os.path.exists(source_image_dir) or not os.path.isdir(source_image_dir):
logger.error(f"源图像目录不存在: {source_image_dir}")
return []
# 从元数据文件中读取已使用的图像信息
try:
with open(poster_metadata_path, 'r', encoding='utf-8') as f:
poster_metadata = json.load(f)
except Exception as e:
logger.error(f"无法读取海报元数据: {e}")
return []
# 获取已经在海报中使用的图像
used_images = []
if 'collage_images' in poster_metadata:
used_images = poster_metadata['collage_images']
logger.info(f"海报中已使用 {len(used_images)} 张图像: {', '.join(used_images)}")
# 列出源目录中的所有图像文件
image_extensions = ('.jpg', '.jpeg', '.png', '.bmp')
available_images = [
f for f in os.listdir(source_image_dir)
if os.path.isfile(os.path.join(source_image_dir, f)) and
f.lower().endswith(image_extensions)
]
if not available_images:
logger.error(f"源目录中没有找到图像: {source_image_dir}")
return []
logger.info(f"源目录中找到 {len(available_images)} 张图像")
# 过滤掉已经在海报中使用的图像
available_images = [img for img in available_images if img not in used_images]
if not available_images:
logger.warning("所有图像都已在海报中使用,无法创建额外配图")
return []
logger.info(f"过滤后可用图像数量: {len(available_images)}")
# 如果可用图像少于请求数量,进行警告但继续处理
if len(available_images) < num_images:
logger.warning(
f"可用图像数量 ({len(available_images)}) 少于请求的配图数量 ({num_images})"
f"将使用所有可用图像"
)
selected_images = available_images
else:
# 随机选择额外图像
random.seed(sum(map(ord, ''.join(available_images)))) # 确保结果一致性
selected_images = random.sample(available_images, num_images)
random.seed() # 重置随机种子
return selected_images
def process_single_image(
self,
run_id,
topic_index,
variant_index,
image_path,
image_filename,
index,
source_dir,
output_filename,
seed,
variation_strength,
extra_effects,
collage_style="grid_2x2"
):
"""处理单张图像 - 此方法可在独立进程中运行"""
try:
# 使用core.simple_collage模块处理图像
style = collage_style if collage_style else "slice"
# 创建临时目录来存放图像以便传递给process_collage函数
import tempfile
import shutil
with tempfile.TemporaryDirectory() as temp_dir:
# 复制图像到临时目录
temp_image_path = os.path.join(temp_dir, image_filename)
shutil.copy2(image_path, temp_image_path)
# 设置随机种子以确保结果一致性
if seed is not None:
random.seed(seed)
np.random.seed(seed)
# 调用core.simple_collage模块处理图像
target_size = (900, 1200) # 3:4比例
collage_images, used_image_filenames = process_collage(
temp_dir,
style=style,
target_size=target_size,
output_count=1
)
# 重置随机种子
if seed is not None:
random.seed()
np.random.seed()
if not collage_images or len(collage_images) == 0:
logger.error(f"拼图模块没有生成有效的图像: {image_filename}")
return None
processed_image = collage_images[0]
# 确保图像是RGB模式解决"cannot write mode RGBA as JPEG"错误
if processed_image.mode == 'RGBA':
logger.debug(f"将RGBA图像转换为RGB模式: {image_filename}")
# 创建白色背景并粘贴RGBA图像
background = Image.new('RGB', processed_image.size, (255, 255, 255))
background.paste(processed_image, mask=processed_image.split()[3]) # 使用alpha通道作为mask
processed_image = background
elif processed_image.mode != 'RGB':
logger.debug(f"{processed_image.mode}图像转换为RGB模式: {image_filename}")
processed_image = processed_image.convert('RGB')
# 创建元数据
additional_metadata = {
"original_image": image_filename,
"additional_index": index + 1,
"source_dir": source_dir,
"is_additional_image": True,
"processed": True,
"aspect_ratio": "3:4",
"collage_style": style
}
# 使用输出处理器保存图像
return self.output_handler.handle_generated_image(
run_id,
topic_index,
variant_index,
'additional', # 图像类型为additional
processed_image,
output_filename,
additional_metadata
)
except Exception as e:
logger.error(f"处理图像时出错 '{image_filename}': {e}")
logger.error(traceback.format_exc())
return None
def add_dct_noise(self, image: Image.Image, intensity: float = 0.1, block_size: int = 8) -> Image.Image:
"""
在DCT域添加噪声以对抗pHash (需要Scipy) - 强化版
Args:
image: 输入图像 (建议传入灰度图或处理亮度通道)
intensity: 噪声强度 (0-1)
block_size: DCT块大小 (通常为8)
Returns:
添加噪声后的图像
"""
if not SCIPY_AVAILABLE:
logger.warning("Scipy 未安装无法执行DCT噪声注入。请运行 'pip install scipy'")
# 可以选择返回原图,或执行一个简化的备用方案
# 这里我们返回原图
return image
try:
logger.debug(f"应用强化DCT噪声强度: {intensity:.3f}")
# 确保是灰度图或提取亮度通道 (这里以灰度为例)
if image.mode != 'L':
gray_image = image.convert('L')
else:
gray_image = image
img_array = np.array(gray_image, dtype=float)
h, w = img_array.shape
# 确保尺寸是块大小的倍数
h_pad = (block_size - h % block_size) % block_size
w_pad = (block_size - w % block_size) % block_size
if h_pad != 0 or w_pad != 0:
img_array = np.pad(img_array, ((0, h_pad), (0, w_pad)), mode='reflect')
padded_h, padded_w = img_array.shape
else:
padded_h, padded_w = h, w
# 定义目标系数范围 (例如排除DC的左上角4x4低频区域)
target_h, target_w = 4, 4
for y in range(0, padded_h, block_size):
for x in range(0, padded_w, block_size):
block = img_array[y:y+block_size, x:x+block_size]
dct_block = dct(dct(block.T, norm='ortho').T, norm='ortho')
# --- 强化噪声逻辑 ---
# 1. 计算噪声幅度,不再完全依赖系数本身大小
noise_amplitude = intensity * 30 # 固定基础噪声幅度 (可调)
# 2. 生成噪声
noise = np.random.uniform(-noise_amplitude, noise_amplitude,
(min(block_size, target_h), min(block_size, target_w)))
# 3. 应用噪声到目标低频区域 (跳过DC)
noise_h, noise_w = noise.shape
# 确保索引不超过dct_block的实际大小
apply_h, apply_w = min(noise_h, dct_block.shape[0]), min(noise_w, dct_block.shape[1])
# 尝试乘性噪声 - 可能对保留结构更好一点
factor = np.random.uniform(1.0 - intensity * 0.8, 1.0 + intensity * 0.8,
(min(block_size, target_h), min(block_size, target_w)))
dct_block[0:apply_h, 0:apply_w] *= factor[0:apply_h, 0:apply_w]
dct_block[0, 0] /= factor[0, 0] # 恢复DC系数近似值
# --- 结束强化噪声逻辑 ---
idct_block = idct(idct(dct_block.T, norm='ortho').T, norm='ortho')
img_array[y:y+block_size, x:x+block_size] = idct_block
if h_pad != 0 or w_pad != 0:
img_array = img_array[:h, :w]
img_array = np.clip(img_array, 0, 255)
modified_gray = Image.fromarray(img_array.astype(np.uint8))
if image.mode == 'RGB' and gray_image is not image:
blend_factor = 0.35 # 稍微增加混合强度
r, g, b = image.split()
r = Image.blend(r, modified_gray, blend_factor)
g = Image.blend(g, modified_gray, blend_factor)
b = Image.blend(b, modified_gray, blend_factor)
merged_image = Image.merge('RGB', (r, g, b))
else:
merged_image = modified_gray
logger.debug("强化DCT噪声应用成功。")
return merged_image
except Exception as e:
logger.error(f"强化DCT噪声注入出错: {e}")
return image
def add_phash_noise(self, image: Image.Image, intensity: float = 0.05) -> Image.Image:
"""调用强化的 add_dct_noise 方法"""
logger.debug(f"调用强化add_dct_noise对抗pHash强度: {intensity:.3f}")
return self.add_dct_noise(image, intensity=intensity)
def apply_smart_crop_resize(self, image: Image.Image, strength: str = "medium") -> Image.Image:
"""
应用智能裁剪和重缩放来抵抗哈希算法 (增强版)
Args:
image: 输入图像
strength: 处理强度 ('low', 'medium', 'high')
Returns:
处理后的图像
"""
try:
original_width, original_height = image.size
logger.debug(f"应用智能裁剪+重缩放 (强度: {strength}), 原始尺寸: {original_width}x{original_height}")
# 根据强度决定裁剪量 (增强)
if strength == "low":
max_crop = 3 # 原为 1
elif strength == "high":
max_crop = 10 # 原为 3
else: # medium
max_crop = 6 # 原为 2
logger.debug(f"增强型智能裁剪: max_crop = {max_crop} 像素")
# 随机决定每边的裁剪量
crop_left = random.randint(0, max_crop)
crop_top = random.randint(0, max_crop)
crop_right = random.randint(0, max_crop)
crop_bottom = random.randint(0, max_crop)
# 计算裁剪后的边界
left = crop_left
top = crop_top
right = original_width - crop_right
bottom = original_height - crop_bottom
# 确保裁剪后尺寸至少为1x1
if left >= right or top >= bottom:
logger.warning("智能裁剪计算无效(裁剪过多),尝试使用较小裁剪量。")
# 尝试减小裁剪量再次计算
safe_max_crop = min(original_width // 4, original_height // 4, max_crop) # 保证不裁掉整个图
crop_left = random.randint(0, safe_max_crop)
crop_top = random.randint(0, safe_max_crop)
crop_right = random.randint(0, safe_max_crop)
crop_bottom = random.randint(0, safe_max_crop)
left = crop_left
top = crop_top
right = original_width - crop_right
bottom = original_height - crop_bottom
if left >= right or top >= bottom: # 再次失败则跳过
logger.error("智能裁剪再次失败,跳过此步骤。")
return image
logger.debug(f" 裁剪参数: L={crop_left}, T={crop_top}, R={crop_right}, B={crop_bottom}")
logger.debug(f" 裁剪区域: ({left}, {top}, {right}, {bottom})")
# 执行裁剪
cropped_image = image.crop((left, top, right, bottom))
# 使用高质量插值将图像缩放回原始尺寸
logger.debug(f" 将裁剪后图像 ({cropped_image.width}x{cropped_image.height}) 缩放回 ({original_width}x{original_height})")
resampling_filter = Image.LANCZOS # 高质量插值
resized_image = cropped_image.resize((original_width, original_height), resample=resampling_filter)
logger.debug("智能裁剪+重缩放应用成功。")
return resized_image
except Exception as e:
logger.error(f"智能裁剪+重缩放时出错: {e}")
return image # 出错时返回原图
def perturb_color_histogram(self, image: Image.Image, strength: float = 0.03) -> Image.Image:
"""
扰动图像的颜色直方图,对抗基于颜色统计的图像匹配
Args:
image: 输入图像
strength: 扰动强度(0-1)
Returns:
处理后的图像
"""
logger.debug(f"扰动颜色直方图,强度: {strength:.3f}")
# 确保为RGB模式
if image.mode != 'RGB':
image = image.convert('RGB')
# 转为numpy数组
img_array = np.array(image)
height, width, channels = img_array.shape
# 对每个通道分别处理
for channel in range(channels):
# 计算当前通道的直方图
hist, _ = np.histogram(img_array[:,:,channel].flatten(), bins=64, range=(0, 256))
# 找出主要颜色区间 (频率高的区间)
threshold = np.percentile(hist, 70) # 取前30%的颜色块
significant_bins = np.where(hist > threshold)[0]
if len(significant_bins) > 0:
for bin_idx in significant_bins:
# 计算当前bin对应的颜色范围
bin_width = 256 // 64
color_low = bin_idx * bin_width
color_high = (bin_idx + 1) * bin_width
# 创建颜色范围掩码
mask = (img_array[:,:,channel] >= color_low) & (img_array[:,:,channel] < color_high)
if np.any(mask):
# 生成随机偏移值
offset = int(strength * bin_width * (random.random() - 0.5) * 2)
# 应用偏移确保在0-255范围内
img_array[:,:,channel][mask] = np.clip(
img_array[:,:,channel][mask] + offset, 0, 255).astype(np.uint8)
# 转回PIL图像
logger.debug("颜色直方图扰动成功。")
return Image.fromarray(img_array)
def strip_metadata(self, image: Image.Image) -> Image.Image:
"""
移除图像中的所有元数据 (修复版)
Args:
image: 输入图像
Returns:
无元数据的图像
"""
logger.debug("移除图像元数据...")
try:
# 确保图像处于适合保存的模式例如RGB
if image.mode == 'RGBA':
# 创建一个白色背景然后粘贴带有alpha的图像
background = Image.new("RGB", image.size, (255, 255, 255))
background.paste(image, mask=image.split()[3]) # 3 is the alpha channel
image_to_save = background
elif image.mode == 'P':
# 带调色板的图像转换为RGB
image_to_save = image.convert('RGB')
elif image.mode == 'L':
# 灰度图通常可以保存为JPEG或PNG
image_to_save = image
elif image.mode == 'RGB':
image_to_save = image # 已经是RGB直接使用
else:
logger.warning(f"未知的图像模式 {image.mode}尝试转换为RGB进行元数据剥离。")
image_to_save = image.convert('RGB')
# 保存到内存缓冲区强制使用JPEG格式以剥离元数据
data = io.BytesIO()
# --- FIX: 强制使用JPEG格式保存到缓冲区 ---
save_format = 'JPEG'
logger.debug(f"强制使用 {save_format} 格式保存以剥离元数据")
image_to_save.save(data, format=save_format, quality=95) # 使用高质量JPEG
# --- END FIX ---
data.seek(0) # 重置缓冲区指针
reloaded_image = Image.open(data)
logger.debug("元数据移除成功。")
return reloaded_image
except Exception as e:
logger.error(f"移除元数据时出错: {e}")
logger.error(traceback.format_exc()) # 打印详细错误
return image # 出错时返回原图
def apply_overlay_noise(self, image: Image.Image, alpha: int = 10, noise_type: str = 'uniform') -> Image.Image:
"""
在图像上叠加一个低透明度的噪声图层
Args:
image: 输入图像
alpha: 叠加噪声图层的 Alpha 值 (0-255)
noise_type: 'gaussian''uniform'
Returns:
叠加噪声后的图像
"""
try:
logger.debug(f"应用低透明度噪声叠加: alpha={alpha}, type={noise_type}")
# 确保图像是 RGBA 模式以处理透明度
if image.mode != 'RGBA':
base_image = image.convert('RGBA')
else:
base_image = image.copy() # 操作副本
width, height = base_image.size
# 创建噪声图层 (灰度噪声即可)
if noise_type == 'gaussian':
# 生成范围在 0-255 的高斯噪声均值128
noise_array = np.random.normal(loc=128, scale=40, size=(height, width)).clip(0, 255).astype(np.uint8)
else: # uniform
noise_array = np.random.randint(0, 256, size=(height, width), dtype=np.uint8)
noise_image = Image.fromarray(noise_array, mode='L')
# 将噪声灰度图转换为 RGBA并设置 alpha 通道
noise_rgba = noise_image.convert('RGBA')
# 创建一个全为指定 alpha 值的通道
alpha_channel = Image.new('L', noise_image.size, alpha)
noise_rgba.putalpha(alpha_channel)
# 使用 alpha_composite 进行混合叠加
# alpha_composite 要求两个输入都是 RGBA
combined_image = Image.alpha_composite(base_image, noise_rgba)
# 通常我们希望最终结果是 RGB所以转换回去
# 如果原图就是 RGBA 且需要保留透明度,则省略此步
final_image = combined_image.convert('RGB')
logger.debug("低透明度噪声叠加应用成功。")
return final_image
except Exception as e:
logger.error(f"应用叠加噪声时出错: {e}")
logger.error(traceback.format_exc()) # 打印详细错误
return image # 出错时返回原图
def apply_ahash_specific_disruption(self, image: Image.Image, strength: str = "medium") -> Image.Image:
"""
专门针对aHash的干扰方法插入亮度带 (增强版)
Args:
image: 输入图像
strength: 处理强度 ('low', 'medium', 'high')
Returns:
处理后的图像
"""
try:
# 设定强度相关参数 (增强)
if strength == "low":
intensity = 0.08 # 原为 0.02
bands = 2
elif strength == "high":
intensity = 0.18 # 原为 0.04
bands = 4
else: # medium
intensity = 0.12 # 原为 0.03
bands = 3
logger.debug(f"应用aHash特定干扰 (亮度带) (增强版), 强度:{strength}, 条带数:{bands}, 强度因子:{intensity:.3f}")
# ... (其余逻辑不变) ...
result = image.copy()
width, height = result.size
pixels = result.load()
is_horizontal = random.choice([True, False])
band_positions = []
if is_horizontal:
for _ in range(bands):
base_pos = random.randint(0, height - 1)
band_positions.append(base_pos)
else:
for _ in range(bands):
base_pos = random.randint(0, width - 1)
band_positions.append(base_pos)
for y_idx in range(height): # Renamed y to y_idx to avoid conflict
for x_idx in range(width): # Renamed x to x_idx to avoid conflict
is_on_band = False
if is_horizontal:
for pos in band_positions:
if abs(y_idx - pos) <= 1:
is_on_band = True
break
else:
for pos in band_positions:
if abs(x_idx - pos) <= 1:
is_on_band = True
break
if is_on_band:
pixel = pixels[x_idx, y_idx]
if isinstance(pixel, int):
r_val = g_val = b_val = pixel # Renamed r,g,b to r_val, g_val, b_val
is_rgb = False
else:
if len(pixel) >= 3:
r_val, g_val, b_val = pixel[0], pixel[1], pixel[2]
is_rgb = True
else:
continue
factor = 1.0 + intensity * (1 if random.random() > 0.5 else -1)
r_val, g_val, b_val = int(r_val * factor), int(g_val * factor), int(b_val * factor)
r_val, g_val, b_val = max(0, min(255, r_val)), max(0, min(255, g_val)), max(0, min(255, b_val))
if is_rgb:
if len(pixel) == 4:
pixels[x_idx, y_idx] = (r_val, g_val, b_val, pixel[3])
else:
pixels[x_idx, y_idx] = (r_val, g_val, b_val)
else:
pixels[x_idx, y_idx] = r_val
logger.debug(f"aHash特定干扰完成: {'水平' if is_horizontal else '垂直'}亮度带")
return result
except Exception as e:
logger.error(f"应用aHash特定干扰时出错: {e}")
logger.error(traceback.format_exc())
return image
def apply_dhash_specific_disruption(self, image: Image.Image, strength: str = "medium") -> Image.Image:
"""
专门针对dHash的干扰方法梯度反向模式 (增强版)
Args:
image: 输入图像
strength: 处理强度 ('low', 'medium', 'high')
Returns:
处理后的图像
"""
try:
# 设定强度相关参数 (增强)
if strength == "low":
gradient_strength = 0.08 # 原为 0.02
regions = 2
elif strength == "high":
gradient_strength = 0.18 # 原为 0.04
regions = 4
else: # medium
gradient_strength = 0.12 # 原为 0.03
regions = 3
logger.debug(f"应用dHash特定干扰 (梯度反向) (增强版), 强度:{strength}, 区域数:{regions}, 梯度强度:{gradient_strength:.3f}")
# ... (其余逻辑不变, 确保使用增强的 gradient_strength) ...
result = image.copy()
width, height = result.size
for _ in range(regions):
region_w = random.randint(width//12, width//8) # Renamed region_width to region_w
region_h = random.randint(height//12, height//8) # Renamed region_height to region_h
region_x_coord = random.randint(0, width - region_w) # Renamed region_x to region_x_coord
region_y_coord = random.randint(0, height - region_h) # Renamed region_y to region_y_coord
region = result.crop((region_x_coord, region_y_coord, region_x_coord + region_w, region_y_coord + region_h))
region_array = np.array(region)
is_rgb = len(region_array.shape) == 3
if is_rgb:
gray_region = np.mean(region_array, axis=2).astype(np.uint8)
else:
gray_region = region_array
h_gradients = np.zeros_like(gray_region, dtype=np.int16)
v_gradients = np.zeros_like(gray_region, dtype=np.int16)
for y_idx in range(region_h): # Renamed y to y_idx
for x_idx in range(region_w-1): # Renamed x to x_idx
h_gradients[y_idx, x_idx] = int(gray_region[y_idx, x_idx+1]) - int(gray_region[y_idx, x_idx])
for y_idx in range(region_h-1): # Renamed y to y_idx
for x_idx in range(region_w): # Renamed x to x_idx
v_gradients[y_idx, x_idx] = int(gray_region[y_idx+1, x_idx]) - int(gray_region[y_idx, x_idx])
modified_region = region_array.astype(np.float32)
for y_idx in range(region_h): # Renamed y to y_idx
for x_idx in range(region_w): # Renamed x to x_idx
if x_idx < region_w-1 and abs(h_gradients[y_idx, x_idx]) > 5:
h_change = -h_gradients[y_idx, x_idx] * gradient_strength
if is_rgb:
for c_channel in range(3): # Renamed c to c_channel
modified_region[y_idx, x_idx+1, c_channel] = np.clip(
modified_region[y_idx, x_idx+1, c_channel] + h_change/2, 0, 255)
modified_region[y_idx, x_idx, c_channel] = np.clip(
modified_region[y_idx, x_idx, c_channel] - h_change/2, 0, 255)
else:
modified_region[y_idx, x_idx+1] = np.clip(
modified_region[y_idx, x_idx+1] + h_change/2, 0, 255)
modified_region[y_idx, x_idx] = np.clip(
modified_region[y_idx, x_idx] - h_change/2, 0, 255)
if y_idx < region_h-1 and abs(v_gradients[y_idx, x_idx]) > 5:
v_change = -v_gradients[y_idx, x_idx] * gradient_strength
if is_rgb:
for c_channel in range(3): # Renamed c to c_channel
modified_region[y_idx+1, x_idx, c_channel] = np.clip(
modified_region[y_idx+1, x_idx, c_channel] + v_change/2, 0, 255)
modified_region[y_idx, x_idx, c_channel] = np.clip(
modified_region[y_idx, x_idx, c_channel] - v_change/2, 0, 255)
else:
modified_region[y_idx+1, x_idx] = np.clip(
modified_region[y_idx+1, x_idx] + v_change/2, 0, 255)
modified_region[y_idx, x_idx] = np.clip(
modified_region[y_idx, x_idx] - v_change/2, 0, 255)
modified_region = modified_region.astype(np.uint8)
modified_region_image = Image.fromarray(modified_region)
result.paste(modified_region_image, (region_x_coord, region_y_coord))
logger.debug(f"dHash特定干扰完成: 在{regions}个区域应用梯度反向")
return result
except Exception as e:
logger.error(f"应用dHash特定干扰时出错: {e}")
logger.error(traceback.format_exc())
return image
def apply_phash_specific_disruption(self, image: Image.Image, strength: str = "medium") -> Image.Image:
"""
专门针对pHash的干扰方法定向DCT系数修改 (增强版)
Args:
image: 输入图像
strength: 处理强度 ('low', 'medium', 'high')
Returns:
处理后的图像
"""
if not SCIPY_AVAILABLE:
logger.warning("Scipy 未安装无法执行pHash专用干扰。请运行 'pip install scipy'")
return image
try:
# 设定强度相关参数 (增强)
if strength == "low":
intensity = 0.20 # 原为 0.10
key_positions_count = 4
elif strength == "high":
intensity = 0.40 # 原为 0.20
key_positions_count = 8
else: # medium
intensity = 0.30 # 原为 0.15
key_positions_count = 6
logger.debug(f"应用pHash特定干扰 (定向DCT干扰) (增强版), 强度:{strength}, 密度:{key_positions_count}, 强度因子:{intensity:.2f}")
# ... (其余逻辑不变, 确保使用增强的 intensity) ...
gray_image = image.convert('L')
img_array_np = np.array(gray_image) # Renamed img_array to img_array_np
h_img, w_img = img_array_np.shape # Renamed h,w to h_img,w_img
resized_array = np.array(gray_image.resize((32, 32), Image.LANCZOS))
dct_array = dct(dct(resized_array.T, norm='ortho').T, norm='ortho')
key_positions = []
for i_pos in range(1, 8): # Renamed i to i_pos
for j_pos in range(1, 8): # Renamed j to j_pos
key_positions.append((i_pos, j_pos))
selected_positions = random.sample(key_positions, k=min(len(key_positions), key_positions_count))
block_h_size, block_w_size = h_img // 32, w_img // 32 # Renamed block_height, block_width
for dct_y_coord, dct_x_coord in selected_positions: # Renamed dct_y, dct_x
orig_y_coord = dct_y_coord * block_h_size # Renamed orig_y to orig_y_coord
orig_x_coord = dct_x_coord * block_w_size # Renamed orig_x to orig_x_coord
pattern_s = min(block_h_size, block_w_size) # Renamed pattern_size to pattern_s
for y_off in range(pattern_s): # Renamed y_offset to y_off
for x_off in range(pattern_s): # Renamed x_offset to x_off
y_val = orig_y_coord + y_off # Renamed y to y_val
x_val = orig_x_coord + x_off # Renamed x to x_val
if 0 <= y_val < h_img and 0 <= x_val < w_img:
offset_val = intensity * 20 * math.sin(2 * math.pi * (y_off / pattern_s)) * \
math.cos(2 * math.pi * (x_off / pattern_s)) # Renamed offset to offset_val
img_array_np[y_val, x_val] = np.clip(img_array_np[y_val, x_val] + offset_val, 0, 255)
result_img = Image.fromarray(img_array_np.astype(np.uint8)) # Renamed result to result_img
if image.mode != 'L':
r_channel, g_channel, b_channel = image.split()[:3] # Renamed r,g,b to r_channel, g_channel, b_channel
diff_img = ImageChops.difference(gray_image, result_img) # Renamed diff to diff_img
diff_array_np = np.array(diff_img) # Renamed diff_array to diff_array_np
r_array_np = np.array(r_channel) # Renamed r_array to r_array_np
g_array_np = np.array(g_channel) # Renamed g_array to g_array_np
b_array_np = np.array(b_channel) # Renamed b_array to b_array_np
transfer_factor = 0.8
r_array_np = np.clip(r_array_np + diff_array_np * transfer_factor, 0, 255).astype(np.uint8)
g_array_np = np.clip(g_array_np + diff_array_np * transfer_factor, 0, 255).astype(np.uint8)
b_array_np = np.clip(b_array_np + diff_array_np * transfer_factor, 0, 255).astype(np.uint8)
r_new_img = Image.fromarray(r_array_np) # Renamed r_new to r_new_img
g_new_img = Image.fromarray(g_array_np) # Renamed g_new to g_new_img
b_new_img = Image.fromarray(b_array_np) # Renamed b_new to b_new_img
if image.mode == 'RGBA':
alpha_channel = image.split()[3] # Renamed a to alpha_channel
result_img = Image.merge('RGBA', (r_new_img, g_new_img, b_new_img, alpha_channel))
else:
result_img = Image.merge('RGB', (r_new_img, g_new_img, b_new_img))
logger.debug(f"pHash特定干扰完成: 修改了{len(selected_positions)}个DCT关键位置")
return result_img
except Exception as e:
logger.error(f"应用pHash特定干扰时出错: {e}")
logger.error(traceback.format_exc())
return image
def apply_block_based_perturbations(self, image: Image.Image, block_size: int = 16, strength: str = "medium") -> Image.Image:
"""
对图像各个块应用不同的、独立的干扰策略 (增强版)
Args:
image: 输入图像
block_size: 块大小
strength: 处理强度 ('low', 'medium', 'high')
Returns:
处理后的图像
"""
try:
# 设定强度相关参数 (增强)
if strength == "low":
factor_range = 0.08 # 原为 0.03
skip_prob = 0.5
elif strength == "high":
factor_range = 0.18 # 原为 0.06
skip_prob = 0.2
else: # medium
factor_range = 0.12 # 原为 0.045
skip_prob = 0.35
logger.debug(f"应用块级混合干扰 (增强版), 块大小:{block_size}, 强度:{strength}, 因子范围:{factor_range:.3f}")
# ... (其余逻辑不变, 确保使用增强的 factor_range) ...
result = image.copy() # Renamed result_img to result
width, height = image.size
img_array = np.array(result)
is_rgb = len(img_array.shape) == 3
strategies = ['brightness', 'contrast', 'hue_shift', 'gradient_flip', 'micro_pattern', 'skip']
processed_blocks = 0
skipped_blocks = 0
for y_coord in range(0, height, block_size): # Renamed y to y_coord
for x_coord in range(0, width, block_size): # Renamed x to x_coord
block_w = min(block_size, width - x_coord)
block_h = min(block_size, height - y_coord)
if block_w < 4 or block_h < 4:
continue
current_strategy = 'skip' if random.random() < skip_prob else random.choice([s for s in strategies if s != 'skip']) # Renamed strategy to current_strategy
if is_rgb:
current_block = img_array[y_coord:y_coord+block_h, x_coord:x_coord+block_w, :] # Renamed block to current_block
else:
current_block = img_array[y_coord:y_coord+block_h, x_coord:x_coord+block_w]
if current_strategy == 'skip':
skipped_blocks +=1
elif current_strategy == 'brightness':
factor = 1.0 + random.uniform(-factor_range, factor_range)
current_block = (current_block.astype(float) * factor).clip(0, 255).astype(np.uint8)
processed_blocks += 1
elif current_strategy == 'contrast':
factor = 1.0 + random.uniform(-factor_range, factor_range)
if is_rgb:
mean_val = np.mean(current_block, axis=(0, 1), keepdims=True)
current_block = (((current_block.astype(float) - mean_val) * factor) + mean_val).clip(0, 255).astype(np.uint8)
else:
mean_val = np.mean(current_block)
current_block = (((current_block.astype(float) - mean_val) * factor) + mean_val).clip(0, 255).astype(np.uint8)
processed_blocks += 1
elif current_strategy == 'hue_shift' and is_rgb:
r_factor = 1.0 - random.uniform(0, factor_range/2)
g_factor = 1.0 - random.uniform(0, factor_range/2)
b_factor = 1.0 - random.uniform(0, factor_range/2)
r_ch, g_ch, b_ch = current_block[:,:,0], current_block[:,:,1], current_block[:,:,2] # Renamed r,g,b to r_ch,g_ch,b_ch
current_block[:,:,0] = (r_ch * r_factor + g_ch * (1-r_factor)).clip(0, 255).astype(np.uint8)
current_block[:,:,1] = (g_ch * g_factor + b_ch * (1-g_factor)).clip(0, 255).astype(np.uint8)
current_block[:,:,2] = (b_ch * b_factor + r_ch * (1-b_factor)).clip(0, 255).astype(np.uint8)
processed_blocks += 1
elif current_strategy == 'gradient_flip':
if block_w > 2 and block_h > 2:
mid_w, mid_h = block_w // 2, block_h // 2
pattern_s = min(mid_w, mid_h) # Renamed pattern_size to pattern_s
for by_idx in range(1, pattern_s-1): # Renamed by to by_idx
for bx_idx in range(1, pattern_s-1): # Renamed bx to bx_idx
if is_rgb:
curr_val = np.mean(current_block[by_idx, bx_idx, :]) # Renamed curr to curr_val
right_val = np.mean(current_block[by_idx, bx_idx+1, :]) # Renamed right to right_val
below_val = np.mean(current_block[by_idx+1, bx_idx, :]) # Renamed below to below_val
if abs(curr_val - right_val) > 5:
diff_val = (curr_val - right_val) * factor_range # Renamed diff to diff_val
current_block[by_idx, bx_idx, :] = np.clip(current_block[by_idx, bx_idx, :] - diff_val/2, 0, 255).astype(np.uint8)
current_block[by_idx, bx_idx+1, :] = np.clip(current_block[by_idx, bx_idx+1, :] + diff_val/2, 0, 255).astype(np.uint8)
if abs(curr_val - below_val) > 5:
diff_val = (curr_val - below_val) * factor_range
current_block[by_idx, bx_idx, :] = np.clip(current_block[by_idx, bx_idx, :] - diff_val/2, 0, 255).astype(np.uint8)
current_block[by_idx+1, bx_idx, :] = np.clip(current_block[by_idx+1, bx_idx, :] + diff_val/2, 0, 255).astype(np.uint8)
else:
curr_val = float(current_block[by_idx, bx_idx])
right_val = float(current_block[by_idx, bx_idx+1])
below_val = float(current_block[by_idx+1, bx_idx])
if abs(curr_val - right_val) > 5:
diff_val = (curr_val - right_val) * factor_range
current_block[by_idx, bx_idx] = np.clip(current_block[by_idx, bx_idx] - diff_val/2, 0, 255).astype(np.uint8)
current_block[by_idx, bx_idx+1] = np.clip(current_block[by_idx, bx_idx+1] + diff_val/2, 0, 255).astype(np.uint8)
if abs(curr_val - below_val) > 5:
diff_val = (curr_val - below_val) * factor_range
current_block[by_idx, bx_idx] = np.clip(current_block[by_idx, bx_idx] - diff_val/2, 0, 255).astype(np.uint8)
current_block[by_idx+1, bx_idx] = np.clip(current_block[by_idx+1, bx_idx] + diff_val/2, 0, 255).astype(np.uint8)
processed_blocks += 1
elif current_strategy == 'micro_pattern':
pattern_type = random.choice(['dot', 'line', 'cross'])
center_y_coord, center_x_coord = block_h // 2, block_w // 2 # Renamed center_y, center_x
pattern_coords = []
if pattern_type == 'dot':
pattern_coords = [(center_y_coord, center_x_coord)]
elif pattern_type == 'line':
if random.choice([True, False]):
pattern_coords = [(center_y_coord, cx_val) for cx_val in range(center_x_coord-1, center_x_coord+2)] # Renamed cx to cx_val
else:
pattern_coords = [(cy_val, center_x_coord) for cy_val in range(center_y_coord-1, center_y_coord+2)] # Renamed cy to cy_val
else:
pattern_coords.extend([(center_y_coord, cx_val) for cx_val in range(center_x_coord-1, center_x_coord+2)])
pattern_coords.extend([(cy_val, center_x_coord) for cy_val in range(center_y_coord-1, center_y_coord+2) if (cy_val, center_x_coord) not in pattern_coords])
pattern_strength = random.uniform(factor_range*50, factor_range*100)
for py_coord, px_coord in pattern_coords: # Renamed py,px to py_coord,px_coord
if 0 <= py_coord < block_h and 0 <= px_coord < block_w:
if is_rgb:
target_channel = random.randint(0, 2) # Renamed channel to target_channel
if random.choice([True, False]):
current_block[py_coord, px_coord, target_channel] = np.clip(current_block[py_coord, px_coord, target_channel] + pattern_strength, 0, 255).astype(np.uint8)
else:
current_block[py_coord, px_coord, target_channel] = np.clip(current_block[py_coord, px_coord, target_channel] - pattern_strength, 0, 255).astype(np.uint8)
else:
if random.choice([True, False]):
current_block[py_coord, px_coord] = np.clip(current_block[py_coord, px_coord] + pattern_strength, 0, 255).astype(np.uint8)
else:
current_block[py_coord, px_coord] = np.clip(current_block[py_coord, px_coord] - pattern_strength, 0, 255).astype(np.uint8)
processed_blocks += 1
if is_rgb:
img_array[y_coord:y_coord+block_h, x_coord:x_coord+block_w, :] = current_block
else:
img_array[y_coord:y_coord+block_h, x_coord:x_coord+block_w] = current_block
result = Image.fromarray(img_array) # Result was already defined
logger.debug(f"块级混合干扰完成: 处理了{processed_blocks}个块, 跳过了{skipped_blocks}个块")
return result
except Exception as e:
logger.error(f"应用块级混合干扰时出错: {e}")
logger.error(traceback.format_exc())
return image
def apply_strategic_hash_disruption(self, image: Image.Image, strength: str = "medium") -> Image.Image:
"""
战略性哈希干扰:对各种哈希算法进行有针对性的干扰
整合了多种针对性干扰策略,包括块级混合干扰和针对特定哈希算法的干扰。
Args:
image: 输入图像
strength: 处理强度 ('low', 'medium', 'high')
Returns:
处理后的图像
"""
try:
logger.info(f"开始战略性哈希干扰 (强度: {strength})")
original_image_for_logging = image.copy()
# 设定策略应用概率
if strength == "low":
ahash_prob = 0.7
dhash_prob = 0.7
phash_prob = 0.9
block_prob = 0.6
block_size = 24
elif strength == "high":
ahash_prob = 0.9
dhash_prob = 0.9
phash_prob = 0.95
block_prob = 0.8
block_size = 16
else: # medium
ahash_prob = 0.8
dhash_prob = 0.8
phash_prob = 0.9
block_prob = 0.7
block_size = 20
logger.debug(f"策略概率: aHash={ahash_prob:.1f}, dHash={dhash_prob:.1f}, pHash={phash_prob:.1f}, 块级={block_prob:.1f}")
# 保存原图
result = image.copy()
applied_strategies = []
# 1. 应用块级混合干扰
if random.random() < block_prob:
result = self.apply_block_based_perturbations(result, block_size=block_size, strength=strength)
applied_strategies.append(f"BlockBased({block_size})")
# 2. 应用针对特定哈希算法的干扰
# 2.1 aHash特定干扰
if random.random() < ahash_prob:
result = self.apply_ahash_specific_disruption(result, strength)
applied_strategies.append("aHash")
# 2.2 dHash特定干扰
if random.random() < dhash_prob:
result = self.apply_dhash_specific_disruption(result, strength)
applied_strategies.append("dHash")
# 2.3 pHash特定干扰最重要的一个
if random.random() < phash_prob:
result = self.apply_phash_specific_disruption(result, strength)
applied_strategies.append("pHash")
logger.info(f"已应用战略干扰: {', '.join(applied_strategies)}")
# 对比修改前后
try:
diff = ImageChops.difference(original_image_for_logging.convert('RGB'), result.convert('RGB')).getbbox()
if diff:
logger.info(f"图像已修改。差异区域: {diff}")
else:
logger.warning("!!!战略干扰似乎未修改图像!!!")
except Exception as log_e:
logger.warning(f"无法比较图像差异: {log_e}")
logger.info(f"战略性哈希干扰完成 (强度: {strength})")
return result
except Exception as e:
logger.error(f"应用战略性哈希干扰时出错: {e}")
logger.error(traceback.format_exc())
return image # 出错时返回原图
def optimize_anti_hash_methods(self, image: Image.Image, strength: str = "medium") -> Image.Image:
"""优化后的哈希对抗方法,使用新的分层增强策略"""
logger.info(f"--- 开始优化抗哈希方法 (强度: {strength}) - 分层增强策略 ---")
processed_image = image.copy()
# 定义各阶段强度参数
global_max_crop: int
global_overlay_alpha: int
global_color_hist_strength: float
if strength == "low":
global_max_crop = 3
global_overlay_alpha = random.randint(8, 12)
global_color_hist_strength = 0.03
elif strength == "high":
global_max_crop = 10
global_overlay_alpha = random.randint(18, 25)
global_color_hist_strength = 0.08
else: # medium
global_max_crop = 6
global_overlay_alpha = random.randint(12, 18)
global_color_hist_strength = 0.05
logger.debug(f"分层策略 - 全局扰动参数: strength_for_crop='{strength}' (内部max_crop将按新标准), overlay_alpha={global_overlay_alpha}, color_hist_strength={global_color_hist_strength:.3f}")
# --- 层 1: 基础全局扰动 ---
logger.info("应用基础全局扰动...")
# 1.1 智能裁剪 + 重缩放 (现在 apply_smart_crop_resize 内部已增强)
processed_image = self.apply_smart_crop_resize(processed_image, strength)
# 1.2 低透明度噪声叠加
processed_image = self.apply_overlay_noise(processed_image, alpha=global_overlay_alpha, noise_type='uniform')
# 1.3 颜色直方图扰动
if global_color_hist_strength > 0: # 确保强度大于0才应用
processed_image = self.perturb_color_histogram(processed_image, strength=global_color_hist_strength)
# --- 层 2: 战略性哈希干扰 (在基础扰动之上) ---
# apply_strategic_hash_disruption 内部调用的各 specific 和 block_based 方法已增强
logger.info("应用战略性哈希干扰 (各子方法已增强)...")
processed_image = self.apply_strategic_hash_disruption(processed_image, strength)
# --- 清除元数据 ---
processed_image = self.strip_metadata(processed_image)
logger.info(f"--- 完成优化抗哈希方法 (强度: {strength}) - 分层增强策略 ---")
return processed_image
def optimized_process_image(
self,
image: Image.Image,
target_ratio: Tuple[int, int],
add_variation: bool = True,
seed: int = None,
variation_strength: str = "medium",
extra_effects: bool = True
) -> Image.Image:
"""优化后的图像处理方法,使用更高效的算法,添加反查重技术"""
# 设置随机种子
if seed is not None:
random.seed(seed)
np.random.seed(seed)
# 根据微调强度设置参数 (保留变化因子等)
if variation_strength == "low":
brightness_factor = random.uniform(0.97, 1.03)
contrast_factor = random.uniform(0.97, 1.03)
saturation_factor = random.uniform(0.97, 1.03)
max_rotation = 0.5
border_size = random.randint(0, 1)
# use_extra = random.random() < 0.3 and extra_effects #<-- 旧逻辑
elif variation_strength == "high":
brightness_factor = random.uniform(0.92, 1.08)
contrast_factor = random.uniform(0.92, 1.08)
saturation_factor = random.uniform(0.92, 1.08)
max_rotation = 2.0
border_size = random.randint(0, 3)
# use_extra = extra_effects #<-- 旧逻辑 (本身就是直接赋值)
else: # medium
brightness_factor = random.uniform(0.95, 1.05)
contrast_factor = random.uniform(0.95, 1.05)
saturation_factor = random.uniform(0.95, 1.05)
max_rotation = 1.0
border_size = random.randint(0, 2)
# use_extra = random.random() < 0.7 and extra_effects #<-- 旧逻辑
# --- FIX: 直接使用传入的 extra_effects 控制是否启用抗哈希和额外效果 ---
use_extra = extra_effects
# --- END FIX ---
# 调整图像为目标比例
width, height = image.size
current_ratio = width / height
target_ratio_value = target_ratio[0] / target_ratio[1]
# 调整大小
if current_ratio > target_ratio_value: # 图片较宽
new_height = 1200
new_width = int(new_height * current_ratio)
else: # 图片较高
new_width = 900
new_height = int(new_width / current_ratio)
# 高效调整尺寸
resized_image = image.resize((new_width, new_height), Image.LANCZOS)
# 裁剪为目标比例
resized_width, resized_height = resized_image.size
if resized_width / resized_height > target_ratio_value:
crop_width = int(resized_height * target_ratio_value)
max_offset = max(1, min(10, (resized_width - crop_width) // 10))
offset = random.randint(-max_offset, max_offset) if add_variation else 0
crop_x1 = max(0, min((resized_width - crop_width) // 2 + offset, resized_width - crop_width))
crop_x2 = crop_x1 + crop_width
result = resized_image.crop((crop_x1, 0, crop_x2, resized_height))
else:
crop_height = int(resized_width / target_ratio_value)
max_offset = max(1, min(10, (resized_height - crop_height) // 10))
offset = random.randint(-max_offset, max_offset) if add_variation else 0
crop_y1 = max(0, min((resized_height - crop_height) // 2 + offset, resized_height - crop_height))
crop_y2 = crop_y1 + crop_height
result = resized_image.crop((0, crop_y1, resized_width, crop_y2))
# 如果不需要变化或是低强度且禁用额外效果
if not add_variation:
logger.info("add_variation=False跳过所有变化和抗哈希处理。")
# 重置随机种子
if seed is not None:
random.seed()
np.random.seed()
# 清除元数据后返回
return self.strip_metadata(result)
logger.info(f"应用基础变化和抗哈希处理 (强度: {variation_strength}, 额外效果: {use_extra})")
processed_image = result.convert('RGB')
# 1. 亮度调整
if abs(brightness_factor - 1.0) > 0.01:
enhancer = ImageEnhance.Brightness(processed_image)
processed_image = enhancer.enhance(brightness_factor)
# 2. 对比度调整
if abs(contrast_factor - 1.0) > 0.01:
enhancer = ImageEnhance.Contrast(processed_image)
processed_image = enhancer.enhance(contrast_factor)
# 3. 饱和度调整
if abs(saturation_factor - 1.0) > 0.01:
enhancer = ImageEnhance.Color(processed_image)
processed_image = enhancer.enhance(saturation_factor)
# 4. 旋转 (只在中高强度时应用)
if variation_strength != "low" and abs(max_rotation) > 0.1:
rotation_angle = random.uniform(-max_rotation, max_rotation)
if abs(rotation_angle) > 0.1: # 只有当角度足够大时才旋转
processed_image = processed_image.rotate(rotation_angle, resample=Image.BICUBIC, expand=False)
# 5. 应用抗哈希技术
if use_extra:
logger.debug("调用 optimize_anti_hash_methods...")
processed_image = self.optimize_anti_hash_methods(processed_image, variation_strength)
else:
logger.info("use_extra=False跳过 optimize_anti_hash_methods。")
# 应用模糊/锐化/边框等额外效果 (如果 use_extra 为 True)
if use_extra:
logger.debug("应用额外效果 (模糊/锐化/边框)...")
# 根据强度决定是否应用特定效果
apply_sharpen = random.random() < 0.4
apply_blur = not apply_sharpen and random.random() < 0.3
# 锐化
if apply_sharpen:
enhancer = ImageEnhance.Sharpness(processed_image)
sharpness = 1.2 if variation_strength == "high" else 1.1
processed_image = enhancer.enhance(sharpness)
# 模糊
elif apply_blur:
radius = 0.7 if variation_strength == "high" else 0.4
processed_image = processed_image.filter(ImageFilter.GaussianBlur(radius=radius))
# 边框处理 (在图像不太小的情况下)
if border_size > 0 and min(processed_image.size) > 300:
border_color = (
random.randint(0, 5),
random.randint(0, 5),
random.randint(0, 5)
)
w, h = processed_image.size
bordered = Image.new('RGB', (w + border_size*2, h + border_size*2), border_color)
bordered.paste(processed_image, (border_size, border_size))
logger.debug("额外效果应用完成。")
else:
logger.info("use_extra=False跳过额外效果。")
# **关键:确保在所有修改之后调用修复后的 strip_metadata**
logger.debug("最后调用 strip_metadata 清除元数据。")
final_image = self.strip_metadata(processed_image)
# 重置随机种子
if seed is not None:
random.seed()
np.random.seed()
logger.debug("随机种子已重置。")
logger.info(f"图像处理完成 (强度: {variation_strength})")
return final_image
def process_poster_for_notes(
run_id: str,
topic_index: int,
variant_index: int,
poster_image_path: str,
poster_metadata_path: str,
source_image_dir: str,
num_additional_images: int,
output_handler: OutputHandler,
output_filename_template: str = "note_{index}.jpg"
) -> List[str]:
"""
处理海报并创建笔记图像
Args:
run_id: 运行ID
topic_index: 主题索引
variant_index: 变体索引
poster_image_path: 海报图像路径
poster_metadata_path: 海报元数据路径
source_image_dir: 源图像目录
num_additional_images: 要使用的额外图像数量
output_handler: 输出处理器
output_filename_template: 输出文件名模板
Returns:
List[str]: 保存的笔记图像路径列表
"""
logger.info(f"开始为海报创建笔记图像: {poster_image_path}")
# 验证输入
if not os.path.exists(poster_image_path):
logger.error(f"海报图像不存在: {poster_image_path}")
return []
# 创建处理器实例并处理
creator = PosterNotesCreator(output_handler)
return creator.create_notes_images(
run_id,
topic_index,
variant_index,
poster_image_path,
poster_metadata_path,
source_image_dir,
num_additional_images,
output_filename_template
)
def select_additional_images(
run_id: str,
topic_index: int,
variant_index: int,
poster_metadata_path: str,
source_image_dir: str,
num_additional_images: int,
output_handler: OutputHandler,
output_filename_template: str = "additional_{index}.jpg",
variation_strength: str = "medium",
extra_effects: bool = True,
collage_style: str = "grid_2x2" # 默认使用grid风格
) -> List[str]:
"""
选择未被海报使用的图像作为额外配图创建2x2网格拼接图像
Args:
run_id: 运行ID
topic_index: 主题索引
variant_index: 变体索引
poster_metadata_path: 海报元数据路径
source_image_dir: 源图像目录
num_additional_images: 要使用的额外图像数量
output_handler: 输出处理器
output_filename_template: 输出文件名模板
variation_strength: 变化强度
extra_effects: 是否应用额外效果
collage_style: 拼图风格,固定为 'grid'
Returns:
List[str]: 保存的图像路径列表
"""
logger.info(f"开始为主题 {topic_index} 变体 {variant_index} 选择额外配图2x2网格风格")
# 验证输入
if not os.path.exists(poster_metadata_path):
logger.error(f"海报元数据不存在: {poster_metadata_path}")
return []
# 创建处理器实例
creator = PosterNotesCreator(output_handler)
# 使用拼图处理图像
return creator.create_additional_images(
run_id,
topic_index,
variant_index,
poster_metadata_path,
source_image_dir,
num_additional_images,
output_filename_template,
variation_strength,
extra_effects,
collage_style
)
def process_directory(directory_path, style=None, target_size=(900, 1200), output_count=1):
"""
处理指定目录中的图片,创建指定数量的拼贴图。
参数:
directory_path: 包含图片的目录路径
target_size: 拼贴图目标尺寸,默认为 (900, 1200)
output_count: 需要生成的拼贴图数量,默认为 1
返回:
tuple: (拼贴图列表, 使用的图片名称列表的列表),如果生成失败,返回 ([], [])
拼贴图列表是PIL.Image对象列表
图片名称列表是一个列表的列表,每个子列表包含一张拼贴图使用的图片文件名
"""
logging.info(f"处理目录中的图片并创建 {output_count} 个拼贴图: {directory_path}")
# 创建 ImageCollageCreator 实例
collage_creator = ImageCollageCreator()
collage_images = []
used_image_names = [] # 存储每个拼贴图使用的图片文件名
# 检查目录是否存在
if not os.path.exists(directory_path):
logging.error(f"目录不存在: {directory_path}")
return [], []
# 支持的图片格式
image_extensions = ('.jpg', '.jpeg', '.png', '.bmp')
# 获取目录中的所有有效图片文件
try:
all_files = os.listdir(directory_path)
all_images_names = [f for f in all_files
if f.lower().endswith(image_extensions) and os.path.isfile(os.path.join(directory_path, f))]
if not all_images_names:
logging.error(f"目录中没有有效的图片文件: {directory_path}")
return [], []
logging.info(f"目录中找到 {len(all_images_names)} 个有效图片文件")
except Exception as e:
logging.exception(f"列出目录内容时出错: {e}")
return [], []
# 尝试创建请求数量的拼贴图
for i in range(output_count):
try:
# 创建拼贴图,使用指定样式
collage, selected_images_names = collage_creator.create_collage_with_style(
directory_path,
style=style,
target_size=target_size
)
if collage:
collage_images.append(collage)
# 从输出日志中解析出使用的图片名称
# 由于我们修改了create_collage_with_style来打印选择的图片
# 可能需要进一步修改为直接返回选择的图片
used_image_names.append(selected_images_names)
logging.info(f"成功创建拼贴图 {i+1}/{output_count}")
else:
logging.error(f"无法创建拼贴图 {i+1}/{output_count}")
except Exception as e:
logging.exception(f"创建拼贴图 {i+1}/{output_count} 时发生异常: {e}")
logging.info(f"已处理目录 {directory_path},成功创建 {len(collage_images)}/{output_count} 个拼贴图")
return collage_images, used_image_names
def find_main_subject(image):
# ... (keep the existing implementation) ...
pass
def adjust_image(image, contrast=1.0, saturation=1.0):
# ... (keep the existing implementation) ...
pass
def smart_crop_and_resize(image, target_aspect_ratio):
# ... (keep the existing implementation) ...
pass
def main():
"""展示如何使用 ImageCollageCreator 和 process_directory 函数的示例。"""
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s')
# 示例目录路径 - 根据实际情况修改
test_directory = "/root/autodl-tmp/sanming_img/modify/古田会议旧址" # 修改为你实际的图片目录
logging.info(f"测试目录: {test_directory}")
# 方法 1: 使用 process_directory 函数 (推荐用于外部调用)
logging.info("方法 1: 使用 process_directory 函数生成拼贴图...")
collages_1, used_image_names_1 = process_directory(
directory_path=test_directory,
target_size=(900, 1200), # 默认 3:4 比例
output_count=2 # 创建 2 张不同的拼贴图
)
if collages_1:
logging.info(f"成功创建了 {len(collages_1)} 张拼贴图 (使用 process_directory)")
# 可选: 保存图片到文件
for i, collage in enumerate(collages_1):
output_path = f"/tmp/collage_method1_{i}.png"
collage.save(output_path)
logging.info(f"拼贴图已保存到: {output_path}")
else:
logging.error("使用 process_directory 创建拼贴图失败")
# 方法 2: 直接使用 ImageCollageCreator 类 (用于更精细的控制)
logging.info("方法 2: 直接使用 ImageCollageCreator 类...")
creator = ImageCollageCreator()
# 指定样式创建拼贴图 (可选样式: grid_2x2, asymmetric, filmstrip, overlap, mosaic, fullscreen, vertical_stack)
styles_to_try = ["grid_2x2", "overlap", "mosaic"]
collages_2 = []
for style in styles_to_try:
logging.info(f"尝试使用样式: {style}")
collage, selected_images_names = creator.create_collage_with_style(
input_dir=test_directory,
style=style,
target_size=(800, 1000) # 自定义尺寸
)
if collage:
collages_2.append(collage)
# 可选: 保存图片到文件
output_path = f"/tmp/collage_method2_{style}.png"
collage.save(output_path)
logging.info(f"使用样式 '{style}' 的拼贴图已保存到: {output_path}")
else:
logging.error(f"使用样式 '{style}' 创建拼贴图失败")
logging.info(f"总共成功创建了 {len(collages_2)} 张拼贴图 (使用 ImageCollageCreator)")
# 比较两种方法
logging.info("===== 拼贴图创建测试完成 =====")
logging.info(f"方法 1 (process_directory): {len(collages_1)} 张拼贴图")
logging.info(f"方法 2 (直接使用 ImageCollageCreator): {len(collages_2)} 张拼贴图")
if __name__ == "__main__":
main()