改善了海报问题

This commit is contained in:
jinye_huang 2025-04-22 21:26:56 +08:00
parent 0bdc8d9ae9
commit a058a173de
13 changed files with 585 additions and 187 deletions

View File

@ -41,6 +41,12 @@ pip install numpy pandas opencv-python pillow openai
- `contentGen.py`: **内容处理器**: 对 AI 生成的原始推文内容进行结构化处理,提取适用于海报的元素。 - `contentGen.py`: **内容处理器**: 对 AI 生成的原始推文内容进行结构化处理,提取适用于海报的元素。
- `posterGen.py`: **海报生成器**: 负责将图片和文字元素组合生成最终的海报图片,处理字体、布局等。 - `posterGen.py`: **海报生成器**: 负责将图片和文字元素组合生成最终的海报图片,处理字体、布局等。
- `simple_collage.py`: **图片拼贴工具**: 提供图片预处理和拼贴功能。 - `simple_collage.py`: **图片拼贴工具**: 提供图片预处理和拼贴功能。
- `ImageCollageCreator` 类:核心拼贴图实现类,提供多种拼贴样式(如标准网格、非对称布局、胶片条、重叠风格等)。
- `process_directory` 函数:对外接口,接收图片目录路径和参数,使用 `ImageCollageCreator` 创建一组拼贴图。
- 支持多种拼贴样式:标准 2x2 网格、非对称布局、胶片条、重叠效果、马赛克风格等。
- 提供图像增强:自动调整对比度、亮度和饱和度,使拼贴图更加美观。
- 智能处理不同大小、格式的图片,自动尝试加载所有支持的图片格式(.jpg, .jpeg, .png, .bmp
- 错误处理与恢复:当部分图片加载失败时,会自动尝试使用其他可用图片代替。
- `utils/`: 工具与辅助模块 - `utils/`: 工具与辅助模块
- `resource_loader.py`: **资源加载器**: 负责加载项目所需的各种**原始**资源文件。 - `resource_loader.py`: **资源加载器**: 负责加载项目所需的各种**原始**资源文件。
- `prompt_manager.py`: **提示词管理器**: **集中管理**不同阶段提示词的构建逻辑(**已修正内容生成提示词构建逻辑正确区分选题JSON中的文件名和描述性文本**)。 - `prompt_manager.py`: **提示词管理器**: **集中管理**不同阶段提示词的构建逻辑(**已修正内容生成提示词构建逻辑正确区分选题JSON中的文件名和描述性文本**)。
@ -341,3 +347,34 @@ This refactoring makes it straightforward to add new output handlers in the futu
* `stream_chunk_timeout` (可选, 默认 60): 处理流式响应时,允许的两个数据块之间的最大等待时间(秒),用于防止流长时间挂起。 * `stream_chunk_timeout` (可选, 默认 60): 处理流式响应时,允许的两个数据块之间的最大等待时间(秒),用于防止流长时间挂起。
项目提供了一个示例配置文件 `example_config.json`,请务必复制并修改: 项目提供了一个示例配置文件 `example_config.json`,请务必复制并修改:
## 图片目录要求
为确保海报生成功能正常工作,请按照以下结构组织图片目录:
1. **基础图片目录**:在 `poster_gen_config.json` 中设置 `image_base_dir` 参数指向基础图片目录。
2. **原始照片目录**:在基础目录下创建 `相机` 子目录(或通过 `camera_image_subdir` 配置),包含原始照片。
3. **修改图片目录**:在基础目录下创建 `modify` 子目录(或通过 `modify_image_subdir` 配置),并其中为每个景点创建子目录:
```
image_base_dir/
├── 相机/
│ ├── 景点1/
│ │ ├── 照片1.jpg
│ │ ├── 照片2.jpg
│ │ └── ...
│ └── 景点2/
│ └── ...
└── modify/
├── 景点1/
│ ├── 图片1.jpg
│ ├── 图片2.jpg
│ └── ... (至少4-9张图片建议多于9张)
└── 景点2/
└── ...
```
**注意事项**
- 每个景点的 `modify` 子目录中至少需要 4 张图片,建议包含 9 张或更多图片以保证拼贴效果多样性。
- 图片应当质量良好,清晰、色彩丰富,尺寸适中(过大或过小的图片都可能导致处理问题)。
- 支持的图片格式为 JPG (.jpg, .jpeg)、PNG (.png) 和 BMP (.bmp)。
- 确保图片文件没有损坏且可以被 PIL 库正常打开。

View File

@ -47,9 +47,9 @@
"object": "...", "object": "...",
"product": "...", "product": "...",
"product_logic": "...", "product_logic": "...",
"style": "历史文化风文案提示词.txt", "style": "...",
"style_logic": "...", "style_logic": "...",
"target_audience": "文化爱好者文旅需求.txt", "target_audience": "...",
"target_audience_logic": "..." "target_audience_logic": "..."
} }
] ]

View File

@ -55,33 +55,40 @@ class AI_Agent():
timeout=self.timeout timeout=self.timeout
) )
try: # try:
self.encoding = tiktoken.encoding_for_model(self.model_name) # self.encoding = tiktoken.encoding_for_model(self.model_name)
except KeyError: # except KeyError:
logging.warning(f"Encoding for model '{self.model_name}' not found. Using 'cl100k_base' encoding.") # logging.warning(f"Encoding for model '{self.model_name}' not found. Using 'cl100k_base' encoding.")
self.encoding = tiktoken.get_encoding("cl100k_base") # self.encoding = tiktoken.get_encoding("cl100k_base")
def generate_text(self, system_prompt, user_prompt, temperature, top_p, presence_penalty): def generate_text(self, system_prompt, user_prompt, temperature, top_p, presence_penalty):
"""生成文本内容并返回完整响应和token估计值""" """生成文本内容并返回完整响应和token估计值"""
logging.info(f"Generating text with model: {self.model_name}, temp={temperature}, top_p={top_p}, presence_penalty={presence_penalty}") logging.info("Starting text generation process...")
logging.debug(f"System Prompt (first 100 chars): {system_prompt[:100]}...") # logging.debug(f"System Prompt (first 100): {system_prompt[:100]}...")
logging.debug(f"User Prompt (first 100 chars): {user_prompt[:100]}...") # logging.debug(f"User Prompt (first 100): {user_prompt[:100]}...") # Avoid logging potentially huge prompts
logging.info(f"Generation Params: temp={temperature}, top_p={top_p}, presence_penalty={presence_penalty}")
time.sleep(random.random())
retry_count = 0 retry_count = 0
max_retry_wait = 10 max_retry_wait = 10 # Max wait time between retries
full_response = ""
while retry_count <= self.max_retries: while retry_count <= self.max_retries:
call_start_time = None # Initialize start time
try: try:
logging.info(f"Attempting API call (try {retry_count + 1}/{self.max_retries + 1})") # --- Added Logging ---
user_prompt_size = len(user_prompt)
logging.info(f"Attempt {retry_count + 1}/{self.max_retries + 1}: Preparing API request. User prompt size: {user_prompt_size} chars.")
call_start_time = time.time()
# --- End Added Logging ---
response = self.client.chat.completions.create( response = self.client.chat.completions.create(
model=self.model_name, model=self.model_name,
messages=[{"role": "system", "content": system_prompt}, messages=[{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}], {"role": "user", "content": user_prompt}],
temperature=temperature, temperature=temperature,
top_p=top_p, top_p=top_p,
presence_penalty=presence_penalty, presence_penalty=presence_penalty,
stream=True, stream=False, # Ensure this is False for non-streaming method
max_tokens=8192, max_tokens=8192,
timeout=self.timeout, timeout=self.timeout,
extra_body={ extra_body={
@ -89,35 +96,35 @@ class AI_Agent():
}, },
) )
full_response = "" # --- Added Logging ---
stream_timed_out = False call_end_time = time.time()
try: logging.info(f"Attempt {retry_count + 1}/{self.max_retries + 1}: API request function returned successfully after {call_end_time - call_start_time:.2f} seconds.")
for chunk in response: # --- End Added Logging ---
if chunk.choices and len(chunk.choices) > 0 and chunk.choices[0].delta.content is not None:
content = chunk.choices[0].delta.content
full_response += content
if chunk.choices and len(chunk.choices) > 0 and chunk.choices[0].finish_reason == "stop":
break
# Successfully finished stream
break
except Exception as stream_err:
logging.warning(f"Exception during stream processing: {stream_err}")
stream_timed_out = True
if stream_timed_out: if response.choices and response.choices[0].message:
if len(full_response) > 100: full_response = response.choices[0].message.content
logging.warning(f"Stream interrupted, but received {len(full_response)} characters. Using partial content.") logging.info(f"Received successful response. Content length: {len(full_response)} chars.")
break break # Success, exit retry loop
else:
retry_count += 1 logging.warning("API response structure unexpected or empty content.")
full_response = "[Error: Empty or invalid response structure]"
# Decide if this specific case should retry or fail immediately
retry_count += 1 # Example: Treat as retryable
if retry_count <= self.max_retries: if retry_count <= self.max_retries:
wait_time = min(2 ** retry_count + random.random(), max_retry_wait) wait_time = min(2 ** retry_count + random.random(), max_retry_wait)
logging.warning(f"Stream error/timeout. Waiting {wait_time:.2f}s before retry ({retry_count}/{self.max_retries})...") logging.warning(f"Retrying due to unexpected response structure ({retry_count}/{self.max_retries}), waiting {wait_time:.2f}s...")
time.sleep(wait_time) time.sleep(wait_time)
continue continue
except (APITimeoutError, APIConnectionError, RateLimitError, APIStatusError) as e: except (APITimeoutError, APIConnectionError, RateLimitError, APIStatusError) as e:
# --- Added Logging ---
if call_start_time:
call_fail_time = time.time()
logging.warning(f"Attempt {retry_count + 1}/{self.max_retries + 1}: API call failed/timed out after {call_fail_time - call_start_time:.2f} seconds.")
else:
logging.warning(f"Attempt {retry_count + 1}/{self.max_retries + 1}: API call failed before or during initiation.")
# --- End Added Logging ---
logging.warning(f"API Error occurred: {e}") logging.warning(f"API Error occurred: {e}")
should_retry = False should_retry = False
if isinstance(e, (APITimeoutError, APIConnectionError, RateLimitError)): if isinstance(e, (APITimeoutError, APIConnectionError, RateLimitError)):

View File

@ -6,6 +6,7 @@ import cv2
import time import time
import random import random
import json import json
import logging
class ContentGenerator: class ContentGenerator:
def __init__(self, def __init__(self,
@ -114,7 +115,70 @@ class ContentGenerator:
返回: 返回:
分割后的json内容 分割后的json内容
""" """
return json.loads(content.split("```json")[1].split("```")[0]) try:
# 首先尝试直接解析整个内容,以防已经是干净的 JSON
try:
return json.loads(content)
except json.JSONDecodeError:
pass # 不是干净的 JSON继续处理
# 常规模式:查找 ```json 和 ``` 之间的内容
if "```json" in content:
json_str = content.split("```json")[1].split("```")[0].strip()
try:
return json.loads(json_str)
except json.JSONDecodeError as e:
print(f"常规格式解析失败: {e}, 尝试其他方法")
# 备用模式1查找连续的 { 开头和 } 结尾的部分
import re
json_pattern = r'(\[.*?\])'
json_matches = re.findall(json_pattern, content, re.DOTALL)
if json_matches:
for match in json_matches:
try:
result = json.loads(match)
if isinstance(result, list) and len(result) > 0:
return result
except:
continue
# 备用模式2查找 { 开头 和 } 结尾,并尝试解析
content = content.strip()
square_bracket_start = content.find('[')
square_bracket_end = content.rfind(']')
if square_bracket_start != -1 and square_bracket_end != -1:
potential_json = content[square_bracket_start:square_bracket_end + 1]
try:
return json.loads(potential_json)
except:
print("尝试提取方括号内容失败")
# 最后一种尝试:查找所有可能的 JSON 结构并尝试解析
json_structures = re.findall(r'({.*?})', content, re.DOTALL)
if json_structures:
items = []
for i, struct in enumerate(json_structures):
try:
item = json.loads(struct)
# 验证结构包含预期字段
if 'main_title' in item and ('texts' in item or 'index' in item):
items.append(item)
except:
continue
if items:
return items
# 都失败了,打印错误并引发异常
print(f"无法解析内容,返回原始文本: {content[:200]}...")
raise ValueError("无法从响应中提取有效的 JSON 格式")
except Exception as e:
print(f"解析内容时出错: {e}")
print(f"原始内容: {content[:200]}...") # 仅显示前200个字符
raise e
def generate_posters(self, poster_num, tweet_content, system_prompt=None, max_retries=3): def generate_posters(self, poster_num, tweet_content, system_prompt=None, max_retries=3):
""" """
@ -292,27 +356,64 @@ class ContentGenerator:
def run(self, info_directory, poster_num, tweet_content): def run(self, info_directory, poster_num, tweet_content):
""" """
运行完整的海报生成流程 运行海报内容生成流程并返回生成的配置数据
参数: 参数:
info_directory: 信息目录 info_directory: 信息目录路径列表 (e.g., ['/path/to/description.txt'])
poster_num: 海报数量 poster_num: 需要生成的海报配置数量
tweet_content: 推文内容 tweet_content: 用于生成内容的推文/文章内容
返回: 返回:
结果保存路径 list | dict | None: 生成的海报配置数据 (通常是列表)如果生成或解析失败则返回 None
""" """
## 读取资料文件
self.load_infomation(info_directory) self.load_infomation(info_directory)
## 生成海报内容 # Generate the raw string response from AI
full_response = self.generate_posters(poster_num, tweet_content) full_response = self.generate_posters(poster_num, tweet_content)
if self.output_dir: # Check if generation failed (indicated by return code 404 or other markers)
## 保存结果 if full_response == 404 or not isinstance(full_response, str) or not full_response.strip():
return self.save_result(full_response) logging.error("Poster content generation failed or returned empty response.")
else: return None
return full_response
# Extract the JSON data from the raw response string
try:
result_data = self.split_content(full_response) # This should return the list/dict
# 验证结果数据格式
if isinstance(result_data, list):
for i, item in enumerate(result_data):
if not isinstance(item, dict):
logging.warning(f"配置项 {i+1} 不是字典格式: {item}")
continue
# 检查并确保必需字段存在
if 'main_title' not in item:
item['main_title'] = f"景点标题 {i+1}"
logging.warning(f"配置项 {i+1} 缺少 main_title 字段,已添加默认值")
if 'texts' not in item:
item['texts'] = ["景点特色", "游玩体验"]
logging.warning(f"配置项 {i+1} 缺少 texts 字段,已添加默认值")
logging.info(f"成功生成并解析海报配置数据,包含 {len(result_data)} 个项目")
else:
logging.warning(f"生成的配置数据不是列表格式: {type(result_data)}")
return result_data # Return the actual data
except Exception as e:
logging.exception(f"Failed to parse JSON from AI response in ContentGenerator: {e}\nRaw Response:\n{full_response[:500]}...") # Log error and partial response
# 失败后尝试创建一个默认配置
logging.info("创建默认海报配置数据")
default_configs = []
for i in range(poster_num):
default_configs.append({
"index": i + 1,
"main_title": f"景点风光 {i+1}",
"texts": ["自然美景", "人文体验"]
})
return default_configs
def set_temperature(self, temperature): def set_temperature(self, temperature):
self.temperature = temperature self.temperature = temperature

View File

@ -164,13 +164,47 @@ class PosterGenerator:
return os.path.join(font_dir, "华康海报体简.ttc") return os.path.join(font_dir, "华康海报体简.ttc")
def create_base_layer(self, image_path, target_size): def create_base_layer(self, image_path, target_size):
"""创建底层(图片层)""" """创建底层(图片层)
Args:
image_path: 可以是图片文件路径字符串也可以是已加载的 PIL Image 对象
target_size: 目标图片尺寸 (width, height)
Returns:
调整大小后的 PIL Image 对象
"""
try: try:
base_image = Image.open(image_path).convert('RGBA') # 检查输入类型
if isinstance(image_path, Image.Image):
# 如果已经是 PIL Image 对象
print("输入已是 PIL Image 对象,无需加载")
base_image = image_path.convert('RGBA')
print(f"图像原始尺寸: {base_image.size}")
else:
# 否则,作为路径处理
# 先验证图片路径
if not image_path:
raise ValueError("图片路径为空")
if not os.path.exists(image_path):
raise FileNotFoundError(f"图片文件不存在: {image_path}")
print(f"尝试加载底图: {image_path}")
base_image = Image.open(image_path).convert('RGBA')
print(f"底图加载成功,原始尺寸: {base_image.size}")
# 调整尺寸
base_image = base_image.resize(target_size, Image.Resampling.LANCZOS) base_image = base_image.resize(target_size, Image.Resampling.LANCZOS)
print(f"底图调整尺寸完成: {target_size}")
return base_image return base_image
except FileNotFoundError as e:
print(f"创建底层失败: {e}")
print(f"当前工作目录: {os.getcwd()}")
return Image.new('RGBA', target_size, (255, 255, 255, 255))
except Exception as e: except Exception as e:
print(f"创建底层失败: {e}") print(f"创建底层失败: {e}")
traceback.print_exc()
return Image.new('RGBA', target_size, (255, 255, 255, 255)) return Image.new('RGBA', target_size, (255, 255, 255, 255))
def add_frame(self, image, target_size): def add_frame(self, image, target_size):
@ -297,12 +331,15 @@ class PosterGenerator:
self.selected_effect = "文字蓝色立体效果" self.selected_effect = "文字蓝色立体效果"
print(f"使用文字效果: {self.selected_effect}") print(f"使用文字效果: {self.selected_effect}")
# 如果没有文字数据,使用默认值 # 检查文字数据
if text_data is None: if text_data is None:
text_data = {'title': '泰宁县 甘露岩寺'} print("警告: 未提供文本数据,使用默认文本")
text_data = {'title': '旅游景点', 'subtitle': '', 'additional_texts': []}
print(f"处理文本数据: {text_data}")
# 1. 处理主标题 # 1. 处理主标题
if hasattr(self, 'title_area') and 'title' in text_data: if hasattr(self, 'title_area') and 'title' in text_data and text_data['title']:
font_path = self._get_font_path() font_path = self._get_font_path()
title = text_data['title'] title = text_data['title']
@ -321,6 +358,8 @@ class PosterGenerator:
# 打印调试信息 # 打印调试信息
self._print_text_debug_info("主标题", font, text_width, x, y, font_path) self._print_text_debug_info("主标题", font, text_width, x, y, font_path)
print(f"- 主标题颜色: 柠檬黄色 RGB(255, 250, 55)") print(f"- 主标题颜色: 柠檬黄色 RGB(255, 250, 55)")
else:
print("警告: 无法处理主标题可能缺少标题数据或title_area未定义")
# 2. 处理副标题(如果有) # 2. 处理副标题(如果有)
if hasattr(self, 'title_area') and 'subtitle' in text_data and text_data['subtitle']: if hasattr(self, 'title_area') and 'subtitle' in text_data and text_data['subtitle']:
@ -350,17 +389,28 @@ class PosterGenerator:
# 3. 处理额外文本(如果有) # 3. 处理额外文本(如果有)
if 'additional_texts' in text_data and text_data['additional_texts']: if 'additional_texts' in text_data and text_data['additional_texts']:
# 打印接收到的额外文本
print(f"接收到额外文本数据: {text_data['additional_texts']}")
# 过滤掉空文本项 # 过滤掉空文本项
additional_texts = [item for item in text_data['additional_texts'] if item.get('text')] valid_additional_texts = []
for item in text_data['additional_texts']:
if isinstance(item, dict) and 'text' in item and item['text']:
valid_additional_texts.append(item)
elif isinstance(item, str) and item:
# 如果是字符串,转换为字典格式
valid_additional_texts.append({"text": item, "position": "bottom", "size_factor": 0.5})
if additional_texts and hasattr(self, 'title_area'): print(f"有效额外文本项: {len(valid_additional_texts)}")
if valid_additional_texts and hasattr(self, 'title_area'):
# 获取主标题的字体大小 # 获取主标题的字体大小
main_title_font_size = font.size main_title_font_size = font.size if 'font' in locals() else 48 # 默认字体大小
# 使用固定字体 # 使用固定字体
specific_font_path = os.path.join("/root/autodl-tmp/poster_baseboard_0403/font", "华康海报体简.ttc") specific_font_path = os.path.join("/root/autodl-tmp/poster_baseboard_0403/font", "华康海报体简.ttc")
if not os.path.isfile(specific_font_path): if not os.path.isfile(specific_font_path):
specific_font_path = font_path specific_font_path = font_path if 'font_path' in locals() else self._get_font_path()
# 计算额外文本在屏幕上的位置 # 计算额外文本在屏幕上的位置
height = target_size[1] height = target_size[1]
@ -376,7 +426,7 @@ class PosterGenerator:
max_text_width = width - (safe_margin_x * 2) max_text_width = width - (safe_margin_x * 2)
# 总文本行数 # 总文本行数
total_lines = len(additional_texts) total_lines = len(valid_additional_texts)
line_height = extra_text_height // total_lines if total_lines > 0 else 0 line_height = extra_text_height // total_lines if total_lines > 0 else 0
print(f"额外文本区域: y={extra_text_y_start}, 高度={extra_text_height}, 每行高度={line_height}") print(f"额外文本区域: y={extra_text_y_start}, 高度={extra_text_height}, 每行高度={line_height}")
@ -384,11 +434,16 @@ class PosterGenerator:
print(f"文本颜色: 统一白色") print(f"文本颜色: 统一白色")
# 渲染每一行文本 # 渲染每一行文本
for i, text_item in enumerate(additional_texts): for i, text_item in enumerate(valid_additional_texts):
item_text = text_item['text'] item_text = text_item['text']
# 设置字体大小为主标题的0.8倍 # 检查文本内容
size_factor = 0.8 if not item_text:
print(f"警告: 额外文本项 {i+1} 文本为空,跳过")
continue
# 设置字体大小为主标题的0.8倍或使用指定的size_factor
size_factor = text_item.get('size_factor', 0.8)
font_size = int(main_title_font_size * size_factor) font_size = int(main_title_font_size * size_factor)
text_font = ImageFont.truetype(specific_font_path, font_size) text_font = ImageFont.truetype(specific_font_path, font_size)
@ -406,8 +461,20 @@ class PosterGenerator:
text_bbox = draw.textbbox((0, 0), item_text, font=text_font) text_bbox = draw.textbbox((0, 0), item_text, font=text_font)
text_height = text_bbox[3] - text_bbox[1] text_height = text_bbox[3] - text_bbox[1]
# 计算垂直位置 - 在分配的空间内居中 # 获取位置参数
line_y = extra_text_y_start + (i * line_height) + ((line_height - text_height) // 2) position = text_item.get('position', 'bottom')
# 根据位置设置垂直位置
if position == 'top':
line_y = int(height * 0.05) + (i * line_height)
elif position == 'middle':
line_y = int(height * 0.45) + (i * line_height)
else: # position == 'bottom' 或其他
# 在底部区域使用更大的垂直间距比如整个海报高度的65%到85%
bottom_start = int(height * 0.65)
bottom_height = int(height * 0.2)
bottom_line_height = bottom_height // total_lines if total_lines > 0 else 0
line_y = bottom_start + (i * bottom_line_height)
# 水平居中位置 # 水平居中位置
line_x = (width - text_width) // 2 line_x = (width - text_width) // 2
@ -425,6 +492,8 @@ class PosterGenerator:
print(f"- 文本颜色: 白色") print(f"- 文本颜色: 白色")
print(f"- 字体大小: {font_size}px (主标题的{size_factor:.2f}倍)") print(f"- 字体大小: {font_size}px (主标题的{size_factor:.2f}倍)")
print(f"- 位置: x={line_x}, y={line_y}") print(f"- 位置: x={line_x}, y={line_y}")
else:
print("无法处理额外文本没有有效的额外文本项或title_area未定义")
return text_layer return text_layer
except Exception as e: except Exception as e:
@ -590,16 +659,84 @@ class PosterGenerator:
draw.text((x, y), text, font=font, fill=text_color) draw.text((x, y), text, font=font, fill=text_color)
def _print_text_debug_info(self, text_type, font, text_width, x, y, font_path): def _print_text_debug_info(self, text_type, font, text_width, x, y, font_path):
pass """打印文本调试信息"""
# print(f" {text_type}: Font={os.path.basename(font_path)}, Size={font.size}, Width={text_width:.0f}, Pos=({x:.0f}, {y:.0f})") print(f"- {text_type} 字体大小: {font.size}px")
print(f"- {text_type} 文本宽度: {text_width}px")
print(f"- {text_type} 位置: x={x}, y={y}")
print(f"- {text_type} 使用字体: {os.path.basename(font_path)}")
def create_poster(self, image_path, text_data): def add_stickers(self, poster_image):
"""
在海报上添加装饰性贴纸
Args:
poster_image: 要添加贴纸的海报图像对象
Returns:
添加了贴纸的海报图像对象
"""
if not hasattr(self, 'sticker_files') or not self.sticker_files:
print("没有可用的贴纸素材,跳过贴纸添加")
return poster_image
try:
# 获取海报尺寸
width, height = poster_image.size
# 决定是否添加贴纸50%概率)
if random.random() < 0.5:
print("随机决定不添加贴纸")
return poster_image
# 随机决定添加1-3个贴纸
sticker_count = random.randint(1, 3)
print(f"准备添加 {sticker_count} 个贴纸")
# 创建一个新图层用于合成
result_image = poster_image.copy()
for i in range(sticker_count):
# 随机选择一个贴纸
sticker_file = random.choice(self.sticker_files)
sticker_path = os.path.join(self.sticker_dir, sticker_file)
# 加载贴纸图像
sticker = Image.open(sticker_path).convert('RGBA')
# 调整贴纸大小原始尺寸的10%-30%
sticker_size_factor = random.uniform(0.1, 0.3)
new_width = int(width * sticker_size_factor)
new_height = int(new_width * sticker.height / sticker.width) # 保持纵横比
sticker = sticker.resize((new_width, new_height), Image.Resampling.LANCZOS)
# 随机选择贴纸位置(避开中央区域)
margin = int(width * 0.1) # 边缘区域的10%
# 生成随机位置(避开中间区域)
if random.random() < 0.5: # 左/右边缘
x = random.randint(margin, int(width * 0.25)) if random.random() < 0.5 else random.randint(int(width * 0.75), width - new_width - margin)
y = random.randint(margin, height - new_height - margin)
else: # 上/下边缘
x = random.randint(margin, width - new_width - margin)
y = random.randint(margin, int(height * 0.25)) if random.random() < 0.5 else random.randint(int(height * 0.75), height - new_height - margin)
# 将贴纸粘贴到结果图像上
result_image.paste(sticker, (x, y), sticker)
print(f"添加贴纸 {i+1}: {sticker_file}, 大小: {new_width}x{new_height}, 位置: ({x}, {y})")
return result_image
except Exception as e:
print(f"添加贴纸失败: {e}")
traceback.print_exc()
return poster_image
def create_poster(self, image_input, text_data):
""" """
Creates a poster by combining the base image, frame (optional), stickers (optional), Creates a poster by combining the base image, frame (optional), stickers (optional),
and text layers. and text layers.
Args: Args:
image_path: Path to the base image (e.g., the generated collage). image_input: 底图输入可以是图片路径字符串或 PIL Image 对象
text_data: Dictionary containing text information ( text_data: Dictionary containing text information (
{ {
'title': 'Main Title Text', 'title': 'Main Title Text',
@ -614,13 +751,15 @@ class PosterGenerator:
target_size = (900, 1200) # TODO: Make target_size a parameter? target_size = (900, 1200) # TODO: Make target_size a parameter?
print(f"\n--- Creating Poster --- ") print(f"\n--- Creating Poster --- ")
print(f"Input Image: {image_path}") if isinstance(image_input, Image.Image):
print(f"Input: PIL Image 对象,尺寸 {image_input.size}")
else:
print(f"Input Image: {image_input}")
print(f"Text Data: {text_data}") print(f"Text Data: {text_data}")
# print(f"Output Name: {output_name}") # output_name is removed
try: try:
# 1. 创建底层(图片) # 1. 创建底层(图片)
base_layer = self.create_base_layer(image_path, target_size) base_layer = self.create_base_layer(image_input, target_size)
if not base_layer: if not base_layer:
raise ValueError("Failed to create base layer.") raise ValueError("Failed to create base layer.")
print("Base layer created.") print("Base layer created.")
@ -700,7 +839,20 @@ def main():
} }
# 处理目录中的所有图片 # 处理目录中的所有图片
img_path = "/root/autodl-tmp/poster_baseboard_0403/output_collage/random_collage_1_collage.png" img_path = "/root/autodl-tmp/poster_baseboard_0403/output_collage/random_collage_1_collage.png"
generator.create_poster(img_path, text_data)
# 先加载图片,然后传递 PIL Image 对象
try:
# 先加载图片
collage_img = Image.open(img_path).convert('RGBA')
print(f"已加载拼贴图: {img_path}, 尺寸: {collage_img.size}")
# 传递图片对象而不是路径
generator.create_poster(collage_img, text_data)
except Exception as e:
print(f"加载或处理图片时出错: {e}")
traceback.print_exc()
# 失败时回退到使用路径
generator.create_poster(img_path, text_data)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -5,6 +5,7 @@ import traceback
import math import math
from pathlib import Path from pathlib import Path
from PIL import Image, ImageDraw, ImageEnhance, ImageFilter, ImageOps from PIL import Image, ImageDraw, ImageEnhance, ImageFilter, ImageOps
import logging # Import logging module
class ImageCollageCreator: class ImageCollageCreator:
def __init__(self): def __init__(self):
@ -137,6 +138,7 @@ class ImageCollageCreator:
def create_collage_with_style(self, input_dir, style=None, target_size=None): def create_collage_with_style(self, input_dir, style=None, target_size=None):
"""创建指定样式的拼接画布""" """创建指定样式的拼接画布"""
logging.info(f"--- Starting Collage Creation for Directory: {input_dir} ---") # Start Log
try: try:
# 设置默认尺寸为3:4比例 # 设置默认尺寸为3:4比例
if target_size is None: if target_size is None:
@ -145,106 +147,123 @@ class ImageCollageCreator:
# 如果没有指定样式,随机选择一种 # 如果没有指定样式,随机选择一种
if style is None or style not in self.collage_styles: if style is None or style not in self.collage_styles:
style = random.choice(self.collage_styles) style = random.choice(self.collage_styles)
print(f"使用拼接样式: {style}") logging.info(f"Using collage style: {style} with target size: {target_size}")
# 检查目录是否存在 # 检查目录是否存在
if not os.path.exists(input_dir): if not os.path.exists(input_dir):
print(f"目录不存在: {input_dir}") logging.error(f"Input directory does not exist: {input_dir}")
return None return None
# 支持的图片格式 # 支持的图片格式
image_extensions = ('.jpg', '.jpeg', '.png', '.bmp') image_extensions = ('.jpg', '.jpeg', '.png', '.bmp')
# 获取目录中的所有图片文件 # 获取目录中的所有文件
all_images = [f for f in os.listdir(input_dir) try:
if f.lower().endswith(image_extensions) and os.path.isfile(os.path.join(input_dir, f))] all_files = os.listdir(input_dir)
logging.info(f"Files found in directory: {all_files}")
if len(all_images) < 4: except Exception as e:
print(f"目录中图片不足四张: {input_dir}") logging.exception(f"Error listing directory {input_dir}: {e}")
return None return None
# 过滤图片文件
all_images_names = [f for f in all_files
if f.lower().endswith(image_extensions) and os.path.isfile(os.path.join(input_dir, f))]
logging.info(f"Filtered image files: {all_images_names}")
# 根据不同样式,可能需要的图片数量不同 if not all_images_names:
logging.warning(f"No valid image files found in directory: {input_dir}")
return None # Return None if no images found
# 根据不同样式,确定需要的图片数量
# ... (logic for num_images based on style) ...
num_images = 4 num_images = 4
if style == "mosaic": if style == "mosaic":
num_images = 9 num_images = 9
elif style == "filmstrip": elif style == "filmstrip":
num_images = 5 num_images = 5
elif style == "fullscreen": elif style == "fullscreen":
num_images = 6 # 全覆盖样式使用6张图片 num_images = 6
elif style == "vertical_stack": elif style == "vertical_stack":
num_images = 2 # 上下拼图样式只需要2张图片 num_images = 2
logging.info(f"Style '{style}' requires {num_images} images.")
# 确保有足够的图像 # 确保有足够的图像 (或重复使用)
if len(all_images) < num_images: selected_images_names = []
print(f"样式'{style}'需要至少{num_images}张图片,但目录只有{len(all_images)}") if len(all_images_names) < num_images:
# 多次使用相同图片 logging.warning(f"Need {num_images} images for style '{style}', but only found {len(all_images_names)}. Will repeat images.")
if len(all_images) > 0: if len(all_images_names) > 0:
all_images = all_images * (num_images // len(all_images) + 1) # Repeat available images to meet the count
selected_images_names = (all_images_names * (num_images // len(all_images_names) + 1))[:num_images]
else:
logging.error("Cannot select images, none were found.") # Should not happen due to earlier check
return None
else:
# 随机选择指定数量的图片
selected_images_names = random.sample(all_images_names, num_images)
# 随机选择指定数量的图片 logging.info(f"Selected image files for collage: {selected_images_names}")
selected_images = random.sample(all_images, num_images)
print(f"随机选择的图片: {selected_images}")
# 创建空白画布
collage_image = Image.new('RGBA', target_size, (0, 0, 0, 0))
# 加载图片 # 加载图片
images = [] images = []
for img_name in selected_images: loaded_image_paths = set()
for img_name in selected_images_names:
img_path = os.path.join(input_dir, img_name) img_path = os.path.join(input_dir, img_name)
try: try:
img = Image.open(img_path).convert('RGBA') img = Image.open(img_path).convert('RGBA')
images.append(img) images.append(img)
print(f"已加载图片: {img_name}") loaded_image_paths.add(img_path)
logging.info(f"Successfully loaded image: {img_path}")
except Exception as e: except Exception as e:
print(f"加载图片 {img_name} 时出错: {e}") logging.error(f"Failed to load image {img_path}: {e}", exc_info=True) # Log exception info
# 如果某张图片加载失败,随机选择另一张图片代替 # Optionally: try to replace failed image (or just log and continue)
remaining_images = [f for f in all_images if f not in selected_images] # For simplicity now, just log and continue; the check below handles insufficient images.
if remaining_images:
replacement = random.choice(remaining_images)
selected_images.append(replacement)
try:
replacement_path = os.path.join(input_dir, replacement)
replacement_img = Image.open(replacement_path).convert('RGBA')
images.append(replacement_img)
print(f"使用替代图片: {replacement}")
except:
print(f"替代图片 {replacement} 也加载失败")
# 确保图片数量足够 # 再次检查实际加载成功的图片数量
while len(images) < num_images: if len(images) < num_images:
if images: logging.error(f"Needed {num_images} images, but only successfully loaded {len(images)}. Cannot create collage.")
images.append(random.choice(images).copy()) # Log which images failed if possible (from error logs above)
else: return None
print("没有可用的图片来创建拼贴画")
return None logging.info(f"Successfully loaded {len(images)} images for collage.")
# 创建空白画布 (moved after image loading success check)
# collage_image = Image.new('RGBA', target_size, (0, 0, 0, 0)) # This line seems unused as styles create their own canvas
# 应用所选样式 # 应用所选样式
logging.info(f"Applying style '{style}'...")
result_collage = None
if style == "grid_2x2": if style == "grid_2x2":
return self._create_grid_2x2_collage(images, target_size) result_collage = self._create_grid_2x2_collage(images, target_size)
# ... (elif for all other styles) ...
elif style == "asymmetric": elif style == "asymmetric":
return self._create_asymmetric_collage(images, target_size) result_collage = self._create_asymmetric_collage(images, target_size)
elif style == "filmstrip": elif style == "filmstrip":
return self._create_filmstrip_collage(images, target_size) result_collage = self._create_filmstrip_collage(images, target_size)
elif style == "circles": # elif style == "circles":
return self._create_circles_collage(images, target_size) # result_collage = self._create_circles_collage(images, target_size)
elif style == "polaroid": elif style == "polaroid":
return self._create_polaroid_collage(images, target_size) result_collage = self._create_polaroid_collage(images, target_size)
elif style == "overlap": elif style == "overlap":
return self._create_overlap_collage(images, target_size) result_collage = self._create_overlap_collage(images, target_size)
elif style == "mosaic": elif style == "mosaic":
return self._create_mosaic_collage(images, target_size) result_collage = self._create_mosaic_collage(images, target_size)
elif style == "fullscreen": elif style == "fullscreen":
return self._create_fullscreen_collage(images, target_size) result_collage = self._create_fullscreen_collage(images, target_size)
elif style == "vertical_stack": elif style == "vertical_stack":
return self._create_vertical_stack_collage(images, target_size) result_collage = self._create_vertical_stack_collage(images, target_size)
else: else:
# 默认使用2x2网格 logging.warning(f"Unknown style '{style}', defaulting to grid_2x2.")
return self._create_grid_2x2_collage(images, target_size) result_collage = self._create_grid_2x2_collage(images, target_size)
if result_collage is None:
logging.error(f"Collage creation failed during style application ('{style}').")
return None
else:
logging.info(f"--- Collage Creation Successful for Directory: {input_dir} ---")
return result_collage # Return the created collage image
except Exception as e: except Exception as e:
print(f"创建拼贴画时出错: {str(e)}") logging.exception(f"An unexpected error occurred during collage creation for {input_dir}: {e}") # Log full traceback
traceback.print_exc()
return None return None
def _create_grid_2x2_collage(self, images, target_size): def _create_grid_2x2_collage(self, images, target_size):
@ -689,45 +708,49 @@ class ImageCollageCreator:
def process_directory(directory_path, target_size=(900, 1200), output_count=1): def process_directory(directory_path, target_size=(900, 1200), output_count=1):
""" """
Processes images in a directory: finds main subject, adjusts contrast/saturation, 处理指定目录中的图片创建指定数量的拼贴图
performs smart cropping/resizing, creates a collage, and returns PIL Image objects.
Args:
directory_path: Path to the directory containing images.
target_size: Tuple (width, height) for the final collage.
output_count: Number of collages to generate.
Returns:
A list containing the generated PIL Image objects for the collages,
or an empty list if processing fails.
"""
image_files = [os.path.join(directory_path, f) for f in os.listdir(directory_path)
if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
if not image_files: 参数:
print(f"No images found in {directory_path}") directory_path: 包含图片的目录路径
return [] target_size: 拼贴图目标尺寸默认为 (900, 1200)
output_count: 需要生成的拼贴图数量默认为 1
# Create collage
返回:
list: 生成的拼贴图列表PIL.Image 对象如果生成失败返回空列表
"""
logging.info(f"处理目录中的图片并创建 {output_count} 个拼贴图: {directory_path}")
# 创建 ImageCollageCreator 实例
collage_creator = ImageCollageCreator()
collage_images = [] collage_images = []
# 检查目录是否存在
if not os.path.exists(directory_path):
logging.error(f"目录不存在: {directory_path}")
return []
# 尝试创建请求数量的拼贴图
for i in range(output_count): for i in range(output_count):
collage = create_collage(image_files, target_size) try:
if collage: # 随机选择一个样式(由 create_collage_with_style 内部实现)
# collage_filename = f"collage_{i}.png" # 传入 None 作为 style 参数,让函数内部随机选择
# save_path = os.path.join(output_dir, collage_filename) collage = collage_creator.create_collage_with_style(
# collage.save(save_path) directory_path,
# print(f"Collage saved to {save_path}") style=None, # 让方法内部随机选择样式
# collage_images.append({'path': save_path, 'image': collage}) target_size=target_size
collage_images.append(collage) # Return the PIL Image object directly )
else:
print(f"Failed to create collage {i}") if collage:
collage_images.append(collage)
logging.info(f"成功创建拼贴图 {i+1}/{output_count}")
else:
logging.error(f"无法创建拼贴图 {i+1}/{output_count}")
except Exception as e:
logging.exception(f"创建拼贴图 {i+1}/{output_count} 时发生异常: {e}")
logging.info(f"已处理目录 {directory_path},成功创建 {len(collage_images)}/{output_count} 个拼贴图")
return collage_images return collage_images
def create_collage(image_paths, target_size=(900, 1200)):
# ... (internal logic, including find_main_subject, adjust_image, smart_crop_and_resize) ...
pass
def find_main_subject(image): def find_main_subject(image):
# ... (keep the existing implementation) ... # ... (keep the existing implementation) ...
pass pass
@ -741,13 +764,64 @@ def smart_crop_and_resize(image, target_aspect_ratio):
pass pass
def main(): def main():
# 设置基础路径 """展示如何使用 ImageCollageCreator 和 process_directory 函数的示例。"""
base_path = "/root/autodl-tmp" logging.basicConfig(level=logging.INFO,
# 默认图片目录 format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s')
input_dir = os.path.join(base_path, "陈家祠")
## 考虑一下,是否需要直接传递图片结果 # 示例目录路径 - 根据实际情况修改
# 处理目录中的图片生成10个随机风格拼贴画 test_directory = "/root/autodl-tmp/sanming_img/modify/古田会议旧址" # 修改为你实际的图片目录
process_directory(input_dir, output_count=10)
logging.info(f"测试目录: {test_directory}")
# 方法 1: 使用 process_directory 函数 (推荐用于外部调用)
logging.info("方法 1: 使用 process_directory 函数生成拼贴图...")
collages_1 = process_directory(
directory_path=test_directory,
target_size=(900, 1200), # 默认 3:4 比例
output_count=2 # 创建 2 张不同的拼贴图
)
if collages_1:
logging.info(f"成功创建了 {len(collages_1)} 张拼贴图 (使用 process_directory)")
# 可选: 保存图片到文件
for i, collage in enumerate(collages_1):
output_path = f"/tmp/collage_method1_{i}.png"
collage.save(output_path)
logging.info(f"拼贴图已保存到: {output_path}")
else:
logging.error("使用 process_directory 创建拼贴图失败")
# 方法 2: 直接使用 ImageCollageCreator 类 (用于更精细的控制)
logging.info("方法 2: 直接使用 ImageCollageCreator 类...")
creator = ImageCollageCreator()
# 指定样式创建拼贴图 (可选样式: grid_2x2, asymmetric, filmstrip, overlap, mosaic, fullscreen, vertical_stack)
styles_to_try = ["grid_2x2", "overlap", "mosaic"]
collages_2 = []
for style in styles_to_try:
logging.info(f"尝试使用样式: {style}")
collage = creator.create_collage_with_style(
input_dir=test_directory,
style=style,
target_size=(800, 1000) # 自定义尺寸
)
if collage:
collages_2.append(collage)
# 可选: 保存图片到文件
output_path = f"/tmp/collage_method2_{style}.png"
collage.save(output_path)
logging.info(f"使用样式 '{style}' 的拼贴图已保存到: {output_path}")
else:
logging.error(f"使用样式 '{style}' 创建拼贴图失败")
logging.info(f"总共成功创建了 {len(collages_2)} 张拼贴图 (使用 ImageCollageCreator)")
# 比较两种方法
logging.info("===== 拼贴图创建测试完成 =====")
logging.info(f"方法 1 (process_directory): {len(collages_1)} 张拼贴图")
logging.info(f"方法 2 (直接使用 ImageCollageCreator): {len(collages_2)} 张拼贴图")
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -46,7 +46,7 @@
"content_temperature": 0.3, "content_temperature": 0.3,
"content_top_p": 0.4, "content_top_p": 0.4,
"content_presence_penalty": 1.5, "content_presence_penalty": 1.5,
"request_timeout": 30, "request_timeout": 120,
"max_retries": 3, "max_retries": 3,
"description_filename": "description.txt", "description_filename": "description.txt",
"output_collage_subdir": "collage_img", "output_collage_subdir": "collage_img",

View File

@ -572,16 +572,28 @@ def generate_posters_for_topic(topic_item: dict,
# --- 使用 OutputHandler 保存 Poster Config --- # --- 使用 OutputHandler 保存 Poster Config ---
output_handler.handle_poster_configs(run_id, topic_index, poster_text_configs_raw) output_handler.handle_poster_configs(run_id, topic_index, poster_text_configs_raw)
# --- 结束使用 Handler 保存 --- # --- 结束使用 Handler 保存 ---
# 打印原始配置数据以进行调试
logging.info(f"生成的海报配置数据: {poster_text_configs_raw}")
poster_config_summary = core_posterGen.PosterConfig(poster_text_configs_raw) # 直接使用配置数据,避免通过文件读取
if isinstance(poster_text_configs_raw, list):
poster_configs = poster_text_configs_raw
logging.info(f"直接使用生成的配置列表,包含 {len(poster_configs)} 个配置项")
else:
# 如果不是列表尝试转换或使用PosterConfig类解析
logging.info("生成的配置数据不是列表使用PosterConfig类进行处理")
poster_config_summary = core_posterGen.PosterConfig(poster_text_configs_raw)
poster_configs = poster_config_summary.get_config()
except Exception as e: except Exception as e:
logging.exception("Error running ContentGenerator or parsing poster configs:") logging.exception("Error running ContentGenerator or parsing poster configs:")
traceback.print_exc() traceback.print_exc()
return False return False
# Poster Generation Loop for each variant # Poster Generation Loop for each variant
poster_num = variants poster_num = min(variants, len(poster_configs)) if isinstance(poster_configs, list) else variants
logging.info(f"计划生成 {poster_num} 个海报变体")
any_poster_attempted = False any_poster_attempted = False
for j_index in range(poster_num): for j_index in range(poster_num):
@ -591,7 +603,15 @@ def generate_posters_for_topic(topic_item: dict,
collage_img = None # To store the generated collage PIL Image collage_img = None # To store the generated collage PIL Image
poster_img = None # To store the final poster PIL Image poster_img = None # To store the final poster PIL Image
try: try:
poster_config = poster_config_summary.get_config_by_index(j_index) # 获取当前变体的配置
if isinstance(poster_configs, list) and j_index < len(poster_configs):
poster_config = poster_configs[j_index]
logging.info(f"使用配置数据项 {j_index+1}: {poster_config}")
else:
# 回退方案使用PosterConfig类
poster_config = poster_config_summary.get_config_by_index(j_index)
logging.info(f"使用PosterConfig类获取配置项 {j_index+1}")
if not poster_config: if not poster_config:
logging.warning(f"Warning: Could not get poster config for index {j_index}. Skipping.") logging.warning(f"Warning: Could not get poster config for index {j_index}. Skipping.")
continue continue
@ -627,9 +647,16 @@ def generate_posters_for_topic(topic_item: dict,
} }
texts = poster_config.get('texts', []) texts = poster_config.get('texts', [])
if texts: if texts:
text_data["additional_texts"].append({"text": texts[0], "position": "bottom", "size_factor": 0.5}) # 确保文本不为空
if len(texts) > 1 and random.random() < text_possibility: if texts[0]:
text_data["additional_texts"].append({"text": texts[1], "position": "bottom", "size_factor": 0.5}) text_data["additional_texts"].append({"text": texts[0], "position": "bottom", "size_factor": 0.5})
# 添加第二个文本(如果有并且满足随机条件)
if len(texts) > 1 and texts[1] and random.random() < text_possibility:
text_data["additional_texts"].append({"text": texts[1], "position": "bottom", "size_factor": 0.5})
# 打印要发送的文本数据
logging.info(f"文本数据: {text_data}")
# 调用修改后的 create_poster, 接收 PIL Image # 调用修改后的 create_poster, 接收 PIL Image
poster_img = poster_gen_instance.create_poster(collage_img, text_data) poster_img = poster_gen_instance.create_poster(collage_img, text_data)