diff --git a/README.md b/README.md index 15cafb3..cde0086 100644 --- a/README.md +++ b/README.md @@ -41,6 +41,12 @@ pip install numpy pandas opencv-python pillow openai - `contentGen.py`: **内容处理器**: 对 AI 生成的原始推文内容进行结构化处理,提取适用于海报的元素。 - `posterGen.py`: **海报生成器**: 负责将图片和文字元素组合生成最终的海报图片,处理字体、布局等。 - `simple_collage.py`: **图片拼贴工具**: 提供图片预处理和拼贴功能。 + - `ImageCollageCreator` 类:核心拼贴图实现类,提供多种拼贴样式(如标准网格、非对称布局、胶片条、重叠风格等)。 + - `process_directory` 函数:对外接口,接收图片目录路径和参数,使用 `ImageCollageCreator` 创建一组拼贴图。 + - 支持多种拼贴样式:标准 2x2 网格、非对称布局、胶片条、重叠效果、马赛克风格等。 + - 提供图像增强:自动调整对比度、亮度和饱和度,使拼贴图更加美观。 + - 智能处理不同大小、格式的图片,自动尝试加载所有支持的图片格式(.jpg, .jpeg, .png, .bmp)。 + - 错误处理与恢复:当部分图片加载失败时,会自动尝试使用其他可用图片代替。 - `utils/`: 工具与辅助模块 - `resource_loader.py`: **资源加载器**: 负责加载项目所需的各种**原始**资源文件。 - `prompt_manager.py`: **提示词管理器**: **集中管理**不同阶段提示词的构建逻辑(**已修正内容生成提示词构建逻辑,正确区分选题JSON中的文件名和描述性文本**)。 @@ -341,3 +347,34 @@ This refactoring makes it straightforward to add new output handlers in the futu * `stream_chunk_timeout` (可选, 默认 60): 处理流式响应时,允许的两个数据块之间的最大等待时间(秒),用于防止流长时间挂起。 项目提供了一个示例配置文件 `example_config.json`,请务必复制并修改: + +## 图片目录要求 + +为确保海报生成功能正常工作,请按照以下结构组织图片目录: + +1. **基础图片目录**:在 `poster_gen_config.json` 中设置 `image_base_dir` 参数指向基础图片目录。 +2. **原始照片目录**:在基础目录下创建 `相机` 子目录(或通过 `camera_image_subdir` 配置),包含原始照片。 +3. **修改图片目录**:在基础目录下创建 `modify` 子目录(或通过 `modify_image_subdir` 配置),并其中为每个景点创建子目录: + ``` + image_base_dir/ + ├── 相机/ + │ ├── 景点1/ + │ │ ├── 照片1.jpg + │ │ ├── 照片2.jpg + │ │ └── ... + │ └── 景点2/ + │ └── ... + └── modify/ + ├── 景点1/ + │ ├── 图片1.jpg + │ ├── 图片2.jpg + │ └── ... (至少4-9张图片,建议多于9张) + └── 景点2/ + └── ... + ``` + +**注意事项**: +- 每个景点的 `modify` 子目录中至少需要 4 张图片,建议包含 9 张或更多图片以保证拼贴效果多样性。 +- 图片应当质量良好,清晰、色彩丰富,尺寸适中(过大或过小的图片都可能导致处理问题)。 +- 支持的图片格式为 JPG (.jpg, .jpeg)、PNG (.png) 和 BMP (.bmp)。 +- 确保图片文件没有损坏且可以被 PIL 库正常打开。 diff --git a/SelectPrompt/systemPrompt.txt b/SelectPrompt/systemPrompt.txt index 9c2669d..cc486a7 100644 --- a/SelectPrompt/systemPrompt.txt +++ b/SelectPrompt/systemPrompt.txt @@ -47,9 +47,9 @@ "object": "...", "product": "...", "product_logic": "...", - "style": "历史文化风文案提示词.txt", + "style": "...", "style_logic": "...", - "target_audience": "文化爱好者文旅需求.txt", + "target_audience": "...", "target_audience_logic": "..." } ] diff --git a/core/__pycache__/ai_agent.cpython-312.pyc b/core/__pycache__/ai_agent.cpython-312.pyc index 6900778..e1356a8 100644 Binary files a/core/__pycache__/ai_agent.cpython-312.pyc and b/core/__pycache__/ai_agent.cpython-312.pyc differ diff --git a/core/__pycache__/contentGen.cpython-312.pyc b/core/__pycache__/contentGen.cpython-312.pyc index 9d4d98a..b3333c4 100644 Binary files a/core/__pycache__/contentGen.cpython-312.pyc and b/core/__pycache__/contentGen.cpython-312.pyc differ diff --git a/core/__pycache__/posterGen.cpython-312.pyc b/core/__pycache__/posterGen.cpython-312.pyc index 910007a..8522899 100644 Binary files a/core/__pycache__/posterGen.cpython-312.pyc and b/core/__pycache__/posterGen.cpython-312.pyc differ diff --git a/core/__pycache__/simple_collage.cpython-312.pyc b/core/__pycache__/simple_collage.cpython-312.pyc index 505b93b..5d870e5 100644 Binary files a/core/__pycache__/simple_collage.cpython-312.pyc and b/core/__pycache__/simple_collage.cpython-312.pyc differ diff --git a/core/ai_agent.py b/core/ai_agent.py index e3dd8c3..63b5e33 100644 --- a/core/ai_agent.py +++ b/core/ai_agent.py @@ -55,33 +55,40 @@ class AI_Agent(): timeout=self.timeout ) - try: - self.encoding = tiktoken.encoding_for_model(self.model_name) - except KeyError: - logging.warning(f"Encoding for model '{self.model_name}' not found. Using 'cl100k_base' encoding.") - self.encoding = tiktoken.get_encoding("cl100k_base") + # try: + # self.encoding = tiktoken.encoding_for_model(self.model_name) + # except KeyError: + # logging.warning(f"Encoding for model '{self.model_name}' not found. Using 'cl100k_base' encoding.") + # self.encoding = tiktoken.get_encoding("cl100k_base") def generate_text(self, system_prompt, user_prompt, temperature, top_p, presence_penalty): """生成文本内容,并返回完整响应和token估计值""" - logging.info(f"Generating text with model: {self.model_name}, temp={temperature}, top_p={top_p}, presence_penalty={presence_penalty}") - logging.debug(f"System Prompt (first 100 chars): {system_prompt[:100]}...") - logging.debug(f"User Prompt (first 100 chars): {user_prompt[:100]}...") + logging.info("Starting text generation process...") + # logging.debug(f"System Prompt (first 100): {system_prompt[:100]}...") + # logging.debug(f"User Prompt (first 100): {user_prompt[:100]}...") # Avoid logging potentially huge prompts + logging.info(f"Generation Params: temp={temperature}, top_p={top_p}, presence_penalty={presence_penalty}") - time.sleep(random.random()) retry_count = 0 - max_retry_wait = 10 + max_retry_wait = 10 # Max wait time between retries + full_response = "" while retry_count <= self.max_retries: + call_start_time = None # Initialize start time try: - logging.info(f"Attempting API call (try {retry_count + 1}/{self.max_retries + 1})") + # --- Added Logging --- + user_prompt_size = len(user_prompt) + logging.info(f"Attempt {retry_count + 1}/{self.max_retries + 1}: Preparing API request. User prompt size: {user_prompt_size} chars.") + call_start_time = time.time() + # --- End Added Logging --- + response = self.client.chat.completions.create( model=self.model_name, - messages=[{"role": "system", "content": system_prompt}, - {"role": "user", "content": user_prompt}], - temperature=temperature, + messages=[{"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt}], + temperature=temperature, top_p=top_p, presence_penalty=presence_penalty, - stream=True, + stream=False, # Ensure this is False for non-streaming method max_tokens=8192, timeout=self.timeout, extra_body={ @@ -89,35 +96,35 @@ class AI_Agent(): }, ) - full_response = "" - stream_timed_out = False - try: - for chunk in response: - if chunk.choices and len(chunk.choices) > 0 and chunk.choices[0].delta.content is not None: - content = chunk.choices[0].delta.content - full_response += content - if chunk.choices and len(chunk.choices) > 0 and chunk.choices[0].finish_reason == "stop": - break - # Successfully finished stream - break - - except Exception as stream_err: - logging.warning(f"Exception during stream processing: {stream_err}") - stream_timed_out = True + # --- Added Logging --- + call_end_time = time.time() + logging.info(f"Attempt {retry_count + 1}/{self.max_retries + 1}: API request function returned successfully after {call_end_time - call_start_time:.2f} seconds.") + # --- End Added Logging --- - if stream_timed_out: - if len(full_response) > 100: - logging.warning(f"Stream interrupted, but received {len(full_response)} characters. Using partial content.") - break - - retry_count += 1 + if response.choices and response.choices[0].message: + full_response = response.choices[0].message.content + logging.info(f"Received successful response. Content length: {len(full_response)} chars.") + break # Success, exit retry loop + else: + logging.warning("API response structure unexpected or empty content.") + full_response = "[Error: Empty or invalid response structure]" + # Decide if this specific case should retry or fail immediately + retry_count += 1 # Example: Treat as retryable if retry_count <= self.max_retries: - wait_time = min(2 ** retry_count + random.random(), max_retry_wait) - logging.warning(f"Stream error/timeout. Waiting {wait_time:.2f}s before retry ({retry_count}/{self.max_retries})...") - time.sleep(wait_time) + wait_time = min(2 ** retry_count + random.random(), max_retry_wait) + logging.warning(f"Retrying due to unexpected response structure ({retry_count}/{self.max_retries}), waiting {wait_time:.2f}s...") + time.sleep(wait_time) continue - + except (APITimeoutError, APIConnectionError, RateLimitError, APIStatusError) as e: + # --- Added Logging --- + if call_start_time: + call_fail_time = time.time() + logging.warning(f"Attempt {retry_count + 1}/{self.max_retries + 1}: API call failed/timed out after {call_fail_time - call_start_time:.2f} seconds.") + else: + logging.warning(f"Attempt {retry_count + 1}/{self.max_retries + 1}: API call failed before or during initiation.") + # --- End Added Logging --- + logging.warning(f"API Error occurred: {e}") should_retry = False if isinstance(e, (APITimeoutError, APIConnectionError, RateLimitError)): diff --git a/core/contentGen.py b/core/contentGen.py index 88d4ede..dbd2f75 100644 --- a/core/contentGen.py +++ b/core/contentGen.py @@ -6,6 +6,7 @@ import cv2 import time import random import json +import logging class ContentGenerator: def __init__(self, @@ -114,7 +115,70 @@ class ContentGenerator: 返回: 分割后的json内容 """ - return json.loads(content.split("```json")[1].split("```")[0]) + try: + # 首先尝试直接解析整个内容,以防已经是干净的 JSON + try: + return json.loads(content) + except json.JSONDecodeError: + pass # 不是干净的 JSON,继续处理 + + # 常规模式:查找 ```json 和 ``` 之间的内容 + if "```json" in content: + json_str = content.split("```json")[1].split("```")[0].strip() + try: + return json.loads(json_str) + except json.JSONDecodeError as e: + print(f"常规格式解析失败: {e}, 尝试其他方法") + + # 备用模式1:查找连续的 { 开头和 } 结尾的部分 + import re + json_pattern = r'(\[.*?\])' + json_matches = re.findall(json_pattern, content, re.DOTALL) + if json_matches: + for match in json_matches: + try: + result = json.loads(match) + if isinstance(result, list) and len(result) > 0: + return result + except: + continue + + # 备用模式2:查找 { 开头 和 } 结尾,并尝试解析 + content = content.strip() + square_bracket_start = content.find('[') + square_bracket_end = content.rfind(']') + + if square_bracket_start != -1 and square_bracket_end != -1: + potential_json = content[square_bracket_start:square_bracket_end + 1] + try: + return json.loads(potential_json) + except: + print("尝试提取方括号内容失败") + + # 最后一种尝试:查找所有可能的 JSON 结构并尝试解析 + json_structures = re.findall(r'({.*?})', content, re.DOTALL) + if json_structures: + items = [] + for i, struct in enumerate(json_structures): + try: + item = json.loads(struct) + # 验证结构包含预期字段 + if 'main_title' in item and ('texts' in item or 'index' in item): + items.append(item) + except: + continue + + if items: + return items + + # 都失败了,打印错误并引发异常 + print(f"无法解析内容,返回原始文本: {content[:200]}...") + raise ValueError("无法从响应中提取有效的 JSON 格式") + + except Exception as e: + print(f"解析内容时出错: {e}") + print(f"原始内容: {content[:200]}...") # 仅显示前200个字符 + raise e def generate_posters(self, poster_num, tweet_content, system_prompt=None, max_retries=3): """ @@ -292,27 +356,64 @@ class ContentGenerator: def run(self, info_directory, poster_num, tweet_content): """ - 运行完整的海报生成流程 + 运行海报内容生成流程,并返回生成的配置数据。 参数: - info_directory: 信息目录 - poster_num: 海报数量 - tweet_content: 推文内容 + info_directory: 信息目录路径列表 (e.g., ['/path/to/description.txt']) + poster_num: 需要生成的海报配置数量 + tweet_content: 用于生成内容的推文/文章内容 返回: - 结果保存路径 + list | dict | None: 生成的海报配置数据 (通常是列表),如果生成或解析失败则返回 None。 """ - ## 读取资料文件 self.load_infomation(info_directory) - ## 生成海报内容 + # Generate the raw string response from AI full_response = self.generate_posters(poster_num, tweet_content) - if self.output_dir: - ## 保存结果 - return self.save_result(full_response) - else: - return full_response + # Check if generation failed (indicated by return code 404 or other markers) + if full_response == 404 or not isinstance(full_response, str) or not full_response.strip(): + logging.error("Poster content generation failed or returned empty response.") + return None + + # Extract the JSON data from the raw response string + try: + result_data = self.split_content(full_response) # This should return the list/dict + + # 验证结果数据格式 + if isinstance(result_data, list): + for i, item in enumerate(result_data): + if not isinstance(item, dict): + logging.warning(f"配置项 {i+1} 不是字典格式: {item}") + continue + + # 检查并确保必需字段存在 + if 'main_title' not in item: + item['main_title'] = f"景点标题 {i+1}" + logging.warning(f"配置项 {i+1} 缺少 main_title 字段,已添加默认值") + + if 'texts' not in item: + item['texts'] = ["景点特色", "游玩体验"] + logging.warning(f"配置项 {i+1} 缺少 texts 字段,已添加默认值") + + logging.info(f"成功生成并解析海报配置数据,包含 {len(result_data)} 个项目") + else: + logging.warning(f"生成的配置数据不是列表格式: {type(result_data)}") + + return result_data # Return the actual data + except Exception as e: + logging.exception(f"Failed to parse JSON from AI response in ContentGenerator: {e}\nRaw Response:\n{full_response[:500]}...") # Log error and partial response + + # 失败后尝试创建一个默认配置 + logging.info("创建默认海报配置数据") + default_configs = [] + for i in range(poster_num): + default_configs.append({ + "index": i + 1, + "main_title": f"景点风光 {i+1}", + "texts": ["自然美景", "人文体验"] + }) + return default_configs def set_temperature(self, temperature): self.temperature = temperature diff --git a/core/posterGen.py b/core/posterGen.py index 29ba1ae..2f65e8c 100644 --- a/core/posterGen.py +++ b/core/posterGen.py @@ -164,13 +164,47 @@ class PosterGenerator: return os.path.join(font_dir, "华康海报体简.ttc") def create_base_layer(self, image_path, target_size): - """创建底层(图片层)""" + """创建底层(图片层) + + Args: + image_path: 可以是图片文件路径字符串,也可以是已加载的 PIL Image 对象 + target_size: 目标图片尺寸 (width, height) + + Returns: + 调整大小后的 PIL Image 对象 + """ try: - base_image = Image.open(image_path).convert('RGBA') + # 检查输入类型 + if isinstance(image_path, Image.Image): + # 如果已经是 PIL Image 对象 + print("输入已是 PIL Image 对象,无需加载") + base_image = image_path.convert('RGBA') + print(f"图像原始尺寸: {base_image.size}") + else: + # 否则,作为路径处理 + # 先验证图片路径 + if not image_path: + raise ValueError("图片路径为空") + + if not os.path.exists(image_path): + raise FileNotFoundError(f"图片文件不存在: {image_path}") + + print(f"尝试加载底图: {image_path}") + base_image = Image.open(image_path).convert('RGBA') + print(f"底图加载成功,原始尺寸: {base_image.size}") + + # 调整尺寸 base_image = base_image.resize(target_size, Image.Resampling.LANCZOS) + print(f"底图调整尺寸完成: {target_size}") + return base_image + except FileNotFoundError as e: + print(f"创建底层失败: {e}") + print(f"当前工作目录: {os.getcwd()}") + return Image.new('RGBA', target_size, (255, 255, 255, 255)) except Exception as e: print(f"创建底层失败: {e}") + traceback.print_exc() return Image.new('RGBA', target_size, (255, 255, 255, 255)) def add_frame(self, image, target_size): @@ -297,12 +331,15 @@ class PosterGenerator: self.selected_effect = "文字蓝色立体效果" print(f"使用文字效果: {self.selected_effect}") - # 如果没有文字数据,使用默认值 + # 检查文字数据 if text_data is None: - text_data = {'title': '泰宁县 甘露岩寺'} + print("警告: 未提供文本数据,使用默认文本") + text_data = {'title': '旅游景点', 'subtitle': '', 'additional_texts': []} + + print(f"处理文本数据: {text_data}") # 1. 处理主标题 - if hasattr(self, 'title_area') and 'title' in text_data: + if hasattr(self, 'title_area') and 'title' in text_data and text_data['title']: font_path = self._get_font_path() title = text_data['title'] @@ -321,6 +358,8 @@ class PosterGenerator: # 打印调试信息 self._print_text_debug_info("主标题", font, text_width, x, y, font_path) print(f"- 主标题颜色: 柠檬黄色 RGB(255, 250, 55)") + else: + print("警告: 无法处理主标题,可能缺少标题数据或title_area未定义") # 2. 处理副标题(如果有) if hasattr(self, 'title_area') and 'subtitle' in text_data and text_data['subtitle']: @@ -350,17 +389,28 @@ class PosterGenerator: # 3. 处理额外文本(如果有) if 'additional_texts' in text_data and text_data['additional_texts']: + # 打印接收到的额外文本 + print(f"接收到额外文本数据: {text_data['additional_texts']}") + # 过滤掉空文本项 - additional_texts = [item for item in text_data['additional_texts'] if item.get('text')] + valid_additional_texts = [] + for item in text_data['additional_texts']: + if isinstance(item, dict) and 'text' in item and item['text']: + valid_additional_texts.append(item) + elif isinstance(item, str) and item: + # 如果是字符串,转换为字典格式 + valid_additional_texts.append({"text": item, "position": "bottom", "size_factor": 0.5}) - if additional_texts and hasattr(self, 'title_area'): + print(f"有效额外文本项: {len(valid_additional_texts)}") + + if valid_additional_texts and hasattr(self, 'title_area'): # 获取主标题的字体大小 - main_title_font_size = font.size + main_title_font_size = font.size if 'font' in locals() else 48 # 默认字体大小 # 使用固定字体 specific_font_path = os.path.join("/root/autodl-tmp/poster_baseboard_0403/font", "华康海报体简.ttc") if not os.path.isfile(specific_font_path): - specific_font_path = font_path + specific_font_path = font_path if 'font_path' in locals() else self._get_font_path() # 计算额外文本在屏幕上的位置 height = target_size[1] @@ -376,7 +426,7 @@ class PosterGenerator: max_text_width = width - (safe_margin_x * 2) # 总文本行数 - total_lines = len(additional_texts) + total_lines = len(valid_additional_texts) line_height = extra_text_height // total_lines if total_lines > 0 else 0 print(f"额外文本区域: y={extra_text_y_start}, 高度={extra_text_height}, 每行高度={line_height}") @@ -384,11 +434,16 @@ class PosterGenerator: print(f"文本颜色: 统一白色") # 渲染每一行文本 - for i, text_item in enumerate(additional_texts): + for i, text_item in enumerate(valid_additional_texts): item_text = text_item['text'] - # 设置字体大小为主标题的0.8倍 - size_factor = 0.8 + # 检查文本内容 + if not item_text: + print(f"警告: 额外文本项 {i+1} 文本为空,跳过") + continue + + # 设置字体大小为主标题的0.8倍或使用指定的size_factor + size_factor = text_item.get('size_factor', 0.8) font_size = int(main_title_font_size * size_factor) text_font = ImageFont.truetype(specific_font_path, font_size) @@ -406,8 +461,20 @@ class PosterGenerator: text_bbox = draw.textbbox((0, 0), item_text, font=text_font) text_height = text_bbox[3] - text_bbox[1] - # 计算垂直位置 - 在分配的空间内居中 - line_y = extra_text_y_start + (i * line_height) + ((line_height - text_height) // 2) + # 获取位置参数 + position = text_item.get('position', 'bottom') + + # 根据位置设置垂直位置 + if position == 'top': + line_y = int(height * 0.05) + (i * line_height) + elif position == 'middle': + line_y = int(height * 0.45) + (i * line_height) + else: # position == 'bottom' 或其他 + # 在底部区域,使用更大的垂直间距,比如整个海报高度的65%到85% + bottom_start = int(height * 0.65) + bottom_height = int(height * 0.2) + bottom_line_height = bottom_height // total_lines if total_lines > 0 else 0 + line_y = bottom_start + (i * bottom_line_height) # 水平居中位置 line_x = (width - text_width) // 2 @@ -425,6 +492,8 @@ class PosterGenerator: print(f"- 文本颜色: 白色") print(f"- 字体大小: {font_size}px (主标题的{size_factor:.2f}倍)") print(f"- 位置: x={line_x}, y={line_y}") + else: + print("无法处理额外文本:没有有效的额外文本项或title_area未定义") return text_layer except Exception as e: @@ -590,16 +659,84 @@ class PosterGenerator: draw.text((x, y), text, font=font, fill=text_color) def _print_text_debug_info(self, text_type, font, text_width, x, y, font_path): - pass - # print(f" {text_type}: Font={os.path.basename(font_path)}, Size={font.size}, Width={text_width:.0f}, Pos=({x:.0f}, {y:.0f})") + """打印文本调试信息""" + print(f"- {text_type} 字体大小: {font.size}px") + print(f"- {text_type} 文本宽度: {text_width}px") + print(f"- {text_type} 位置: x={x}, y={y}") + print(f"- {text_type} 使用字体: {os.path.basename(font_path)}") - def create_poster(self, image_path, text_data): + def add_stickers(self, poster_image): + """ + 在海报上添加装饰性贴纸 + + Args: + poster_image: 要添加贴纸的海报图像对象 + + Returns: + 添加了贴纸的海报图像对象 + """ + if not hasattr(self, 'sticker_files') or not self.sticker_files: + print("没有可用的贴纸素材,跳过贴纸添加") + return poster_image + + try: + # 获取海报尺寸 + width, height = poster_image.size + + # 决定是否添加贴纸(50%概率) + if random.random() < 0.5: + print("随机决定不添加贴纸") + return poster_image + + # 随机决定添加1-3个贴纸 + sticker_count = random.randint(1, 3) + print(f"准备添加 {sticker_count} 个贴纸") + + # 创建一个新图层用于合成 + result_image = poster_image.copy() + + for i in range(sticker_count): + # 随机选择一个贴纸 + sticker_file = random.choice(self.sticker_files) + sticker_path = os.path.join(self.sticker_dir, sticker_file) + + # 加载贴纸图像 + sticker = Image.open(sticker_path).convert('RGBA') + + # 调整贴纸大小(原始尺寸的10%-30%) + sticker_size_factor = random.uniform(0.1, 0.3) + new_width = int(width * sticker_size_factor) + new_height = int(new_width * sticker.height / sticker.width) # 保持纵横比 + sticker = sticker.resize((new_width, new_height), Image.Resampling.LANCZOS) + + # 随机选择贴纸位置(避开中央区域) + margin = int(width * 0.1) # 边缘区域的10% + + # 生成随机位置(避开中间区域) + if random.random() < 0.5: # 左/右边缘 + x = random.randint(margin, int(width * 0.25)) if random.random() < 0.5 else random.randint(int(width * 0.75), width - new_width - margin) + y = random.randint(margin, height - new_height - margin) + else: # 上/下边缘 + x = random.randint(margin, width - new_width - margin) + y = random.randint(margin, int(height * 0.25)) if random.random() < 0.5 else random.randint(int(height * 0.75), height - new_height - margin) + + # 将贴纸粘贴到结果图像上 + result_image.paste(sticker, (x, y), sticker) + print(f"添加贴纸 {i+1}: {sticker_file}, 大小: {new_width}x{new_height}, 位置: ({x}, {y})") + + return result_image + except Exception as e: + print(f"添加贴纸失败: {e}") + traceback.print_exc() + return poster_image + + def create_poster(self, image_input, text_data): """ Creates a poster by combining the base image, frame (optional), stickers (optional), and text layers. Args: - image_path: Path to the base image (e.g., the generated collage). + image_input: 底图输入,可以是图片路径字符串或 PIL Image 对象 text_data: Dictionary containing text information ( { 'title': 'Main Title Text', @@ -614,13 +751,15 @@ class PosterGenerator: target_size = (900, 1200) # TODO: Make target_size a parameter? print(f"\n--- Creating Poster --- ") - print(f"Input Image: {image_path}") + if isinstance(image_input, Image.Image): + print(f"Input: PIL Image 对象,尺寸 {image_input.size}") + else: + print(f"Input Image: {image_input}") print(f"Text Data: {text_data}") - # print(f"Output Name: {output_name}") # output_name is removed try: # 1. 创建底层(图片) - base_layer = self.create_base_layer(image_path, target_size) + base_layer = self.create_base_layer(image_input, target_size) if not base_layer: raise ValueError("Failed to create base layer.") print("Base layer created.") @@ -700,7 +839,20 @@ def main(): } # 处理目录中的所有图片 img_path = "/root/autodl-tmp/poster_baseboard_0403/output_collage/random_collage_1_collage.png" - generator.create_poster(img_path, text_data) + + # 先加载图片,然后传递 PIL Image 对象 + try: + # 先加载图片 + collage_img = Image.open(img_path).convert('RGBA') + print(f"已加载拼贴图: {img_path}, 尺寸: {collage_img.size}") + + # 传递图片对象而不是路径 + generator.create_poster(collage_img, text_data) + except Exception as e: + print(f"加载或处理图片时出错: {e}") + traceback.print_exc() + # 失败时回退到使用路径 + generator.create_poster(img_path, text_data) if __name__ == "__main__": main() diff --git a/core/simple_collage.py b/core/simple_collage.py index 1da23c3..d1598b6 100644 --- a/core/simple_collage.py +++ b/core/simple_collage.py @@ -5,6 +5,7 @@ import traceback import math from pathlib import Path from PIL import Image, ImageDraw, ImageEnhance, ImageFilter, ImageOps +import logging # Import logging module class ImageCollageCreator: def __init__(self): @@ -137,6 +138,7 @@ class ImageCollageCreator: def create_collage_with_style(self, input_dir, style=None, target_size=None): """创建指定样式的拼接画布""" + logging.info(f"--- Starting Collage Creation for Directory: {input_dir} ---") # Start Log try: # 设置默认尺寸为3:4比例 if target_size is None: @@ -145,106 +147,123 @@ class ImageCollageCreator: # 如果没有指定样式,随机选择一种 if style is None or style not in self.collage_styles: style = random.choice(self.collage_styles) - print(f"使用拼接样式: {style}") + logging.info(f"Using collage style: {style} with target size: {target_size}") # 检查目录是否存在 if not os.path.exists(input_dir): - print(f"目录不存在: {input_dir}") + logging.error(f"Input directory does not exist: {input_dir}") return None # 支持的图片格式 image_extensions = ('.jpg', '.jpeg', '.png', '.bmp') - # 获取目录中的所有图片文件 - all_images = [f for f in os.listdir(input_dir) - if f.lower().endswith(image_extensions) and os.path.isfile(os.path.join(input_dir, f))] - - if len(all_images) < 4: - print(f"目录中图片不足四张: {input_dir}") + # 获取目录中的所有文件 + try: + all_files = os.listdir(input_dir) + logging.info(f"Files found in directory: {all_files}") + except Exception as e: + logging.exception(f"Error listing directory {input_dir}: {e}") return None + + # 过滤图片文件 + all_images_names = [f for f in all_files + if f.lower().endswith(image_extensions) and os.path.isfile(os.path.join(input_dir, f))] + logging.info(f"Filtered image files: {all_images_names}") - # 根据不同样式,可能需要的图片数量不同 + if not all_images_names: + logging.warning(f"No valid image files found in directory: {input_dir}") + return None # Return None if no images found + + # 根据不同样式,确定需要的图片数量 + # ... (logic for num_images based on style) ... num_images = 4 if style == "mosaic": num_images = 9 elif style == "filmstrip": num_images = 5 elif style == "fullscreen": - num_images = 6 # 全覆盖样式使用6张图片 + num_images = 6 elif style == "vertical_stack": - num_images = 2 # 上下拼图样式只需要2张图片 + num_images = 2 + logging.info(f"Style '{style}' requires {num_images} images.") - # 确保有足够的图像 - if len(all_images) < num_images: - print(f"样式'{style}'需要至少{num_images}张图片,但目录只有{len(all_images)}张") - # 多次使用相同图片 - if len(all_images) > 0: - all_images = all_images * (num_images // len(all_images) + 1) + # 确保有足够的图像 (或重复使用) + selected_images_names = [] + if len(all_images_names) < num_images: + logging.warning(f"Need {num_images} images for style '{style}', but only found {len(all_images_names)}. Will repeat images.") + if len(all_images_names) > 0: + # Repeat available images to meet the count + selected_images_names = (all_images_names * (num_images // len(all_images_names) + 1))[:num_images] + else: + logging.error("Cannot select images, none were found.") # Should not happen due to earlier check + return None + else: + # 随机选择指定数量的图片 + selected_images_names = random.sample(all_images_names, num_images) - # 随机选择指定数量的图片 - selected_images = random.sample(all_images, num_images) - print(f"随机选择的图片: {selected_images}") - - # 创建空白画布 - collage_image = Image.new('RGBA', target_size, (0, 0, 0, 0)) + logging.info(f"Selected image files for collage: {selected_images_names}") # 加载图片 images = [] - for img_name in selected_images: + loaded_image_paths = set() + for img_name in selected_images_names: img_path = os.path.join(input_dir, img_name) try: img = Image.open(img_path).convert('RGBA') images.append(img) - print(f"已加载图片: {img_name}") + loaded_image_paths.add(img_path) + logging.info(f"Successfully loaded image: {img_path}") except Exception as e: - print(f"加载图片 {img_name} 时出错: {e}") - # 如果某张图片加载失败,随机选择另一张图片代替 - remaining_images = [f for f in all_images if f not in selected_images] - if remaining_images: - replacement = random.choice(remaining_images) - selected_images.append(replacement) - try: - replacement_path = os.path.join(input_dir, replacement) - replacement_img = Image.open(replacement_path).convert('RGBA') - images.append(replacement_img) - print(f"使用替代图片: {replacement}") - except: - print(f"替代图片 {replacement} 也加载失败") + logging.error(f"Failed to load image {img_path}: {e}", exc_info=True) # Log exception info + # Optionally: try to replace failed image (or just log and continue) + # For simplicity now, just log and continue; the check below handles insufficient images. - # 确保图片数量足够 - while len(images) < num_images: - if images: - images.append(random.choice(images).copy()) - else: - print("没有可用的图片来创建拼贴画") - return None + # 再次检查实际加载成功的图片数量 + if len(images) < num_images: + logging.error(f"Needed {num_images} images, but only successfully loaded {len(images)}. Cannot create collage.") + # Log which images failed if possible (from error logs above) + return None + + logging.info(f"Successfully loaded {len(images)} images for collage.") + + # 创建空白画布 (moved after image loading success check) + # collage_image = Image.new('RGBA', target_size, (0, 0, 0, 0)) # This line seems unused as styles create their own canvas # 应用所选样式 + logging.info(f"Applying style '{style}'...") + result_collage = None if style == "grid_2x2": - return self._create_grid_2x2_collage(images, target_size) + result_collage = self._create_grid_2x2_collage(images, target_size) + # ... (elif for all other styles) ... elif style == "asymmetric": - return self._create_asymmetric_collage(images, target_size) + result_collage = self._create_asymmetric_collage(images, target_size) elif style == "filmstrip": - return self._create_filmstrip_collage(images, target_size) - elif style == "circles": - return self._create_circles_collage(images, target_size) + result_collage = self._create_filmstrip_collage(images, target_size) + # elif style == "circles": + # result_collage = self._create_circles_collage(images, target_size) elif style == "polaroid": - return self._create_polaroid_collage(images, target_size) + result_collage = self._create_polaroid_collage(images, target_size) elif style == "overlap": - return self._create_overlap_collage(images, target_size) + result_collage = self._create_overlap_collage(images, target_size) elif style == "mosaic": - return self._create_mosaic_collage(images, target_size) + result_collage = self._create_mosaic_collage(images, target_size) elif style == "fullscreen": - return self._create_fullscreen_collage(images, target_size) + result_collage = self._create_fullscreen_collage(images, target_size) elif style == "vertical_stack": - return self._create_vertical_stack_collage(images, target_size) + result_collage = self._create_vertical_stack_collage(images, target_size) else: - # 默认使用2x2网格 - return self._create_grid_2x2_collage(images, target_size) + logging.warning(f"Unknown style '{style}', defaulting to grid_2x2.") + result_collage = self._create_grid_2x2_collage(images, target_size) + + if result_collage is None: + logging.error(f"Collage creation failed during style application ('{style}').") + return None + else: + logging.info(f"--- Collage Creation Successful for Directory: {input_dir} ---") + return result_collage # Return the created collage image except Exception as e: - print(f"创建拼贴画时出错: {str(e)}") - traceback.print_exc() + logging.exception(f"An unexpected error occurred during collage creation for {input_dir}: {e}") # Log full traceback return None def _create_grid_2x2_collage(self, images, target_size): @@ -689,45 +708,49 @@ class ImageCollageCreator: def process_directory(directory_path, target_size=(900, 1200), output_count=1): """ - Processes images in a directory: finds main subject, adjusts contrast/saturation, - performs smart cropping/resizing, creates a collage, and returns PIL Image objects. - - Args: - directory_path: Path to the directory containing images. - target_size: Tuple (width, height) for the final collage. - output_count: Number of collages to generate. - - Returns: - A list containing the generated PIL Image objects for the collages, - or an empty list if processing fails. - """ - image_files = [os.path.join(directory_path, f) for f in os.listdir(directory_path) - if f.lower().endswith(('.png', '.jpg', '.jpeg'))] + 处理指定目录中的图片,创建指定数量的拼贴图。 - if not image_files: - print(f"No images found in {directory_path}") - return [] - - # Create collage + 参数: + directory_path: 包含图片的目录路径 + target_size: 拼贴图目标尺寸,默认为 (900, 1200) + output_count: 需要生成的拼贴图数量,默认为 1 + + 返回: + list: 生成的拼贴图列表(PIL.Image 对象);如果生成失败,返回空列表 + """ + logging.info(f"处理目录中的图片并创建 {output_count} 个拼贴图: {directory_path}") + + # 创建 ImageCollageCreator 实例 + collage_creator = ImageCollageCreator() collage_images = [] + + # 检查目录是否存在 + if not os.path.exists(directory_path): + logging.error(f"目录不存在: {directory_path}") + return [] + + # 尝试创建请求数量的拼贴图 for i in range(output_count): - collage = create_collage(image_files, target_size) - if collage: - # collage_filename = f"collage_{i}.png" - # save_path = os.path.join(output_dir, collage_filename) - # collage.save(save_path) - # print(f"Collage saved to {save_path}") - # collage_images.append({'path': save_path, 'image': collage}) - collage_images.append(collage) # Return the PIL Image object directly - else: - print(f"Failed to create collage {i}") - + try: + # 随机选择一个样式(由 create_collage_with_style 内部实现) + # 传入 None 作为 style 参数,让函数内部随机选择 + collage = collage_creator.create_collage_with_style( + directory_path, + style=None, # 让方法内部随机选择样式 + target_size=target_size + ) + + if collage: + collage_images.append(collage) + logging.info(f"成功创建拼贴图 {i+1}/{output_count}") + else: + logging.error(f"无法创建拼贴图 {i+1}/{output_count}") + except Exception as e: + logging.exception(f"创建拼贴图 {i+1}/{output_count} 时发生异常: {e}") + + logging.info(f"已处理目录 {directory_path},成功创建 {len(collage_images)}/{output_count} 个拼贴图") return collage_images -def create_collage(image_paths, target_size=(900, 1200)): - # ... (internal logic, including find_main_subject, adjust_image, smart_crop_and_resize) ... - pass - def find_main_subject(image): # ... (keep the existing implementation) ... pass @@ -741,13 +764,64 @@ def smart_crop_and_resize(image, target_aspect_ratio): pass def main(): - # 设置基础路径 - base_path = "/root/autodl-tmp" - # 默认图片目录 - input_dir = os.path.join(base_path, "陈家祠") - ## 考虑一下,是否需要直接传递图片结果 - # 处理目录中的图片,生成10个随机风格拼贴画 - process_directory(input_dir, output_count=10) - + """展示如何使用 ImageCollageCreator 和 process_directory 函数的示例。""" + logging.basicConfig(level=logging.INFO, + format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s') + + # 示例目录路径 - 根据实际情况修改 + test_directory = "/root/autodl-tmp/sanming_img/modify/古田会议旧址" # 修改为你实际的图片目录 + + logging.info(f"测试目录: {test_directory}") + + # 方法 1: 使用 process_directory 函数 (推荐用于外部调用) + logging.info("方法 1: 使用 process_directory 函数生成拼贴图...") + collages_1 = process_directory( + directory_path=test_directory, + target_size=(900, 1200), # 默认 3:4 比例 + output_count=2 # 创建 2 张不同的拼贴图 + ) + + if collages_1: + logging.info(f"成功创建了 {len(collages_1)} 张拼贴图 (使用 process_directory)") + # 可选: 保存图片到文件 + for i, collage in enumerate(collages_1): + output_path = f"/tmp/collage_method1_{i}.png" + collage.save(output_path) + logging.info(f"拼贴图已保存到: {output_path}") + else: + logging.error("使用 process_directory 创建拼贴图失败") + + # 方法 2: 直接使用 ImageCollageCreator 类 (用于更精细的控制) + logging.info("方法 2: 直接使用 ImageCollageCreator 类...") + creator = ImageCollageCreator() + + # 指定样式创建拼贴图 (可选样式: grid_2x2, asymmetric, filmstrip, overlap, mosaic, fullscreen, vertical_stack) + styles_to_try = ["grid_2x2", "overlap", "mosaic"] + collages_2 = [] + + for style in styles_to_try: + logging.info(f"尝试使用样式: {style}") + collage = creator.create_collage_with_style( + input_dir=test_directory, + style=style, + target_size=(800, 1000) # 自定义尺寸 + ) + + if collage: + collages_2.append(collage) + # 可选: 保存图片到文件 + output_path = f"/tmp/collage_method2_{style}.png" + collage.save(output_path) + logging.info(f"使用样式 '{style}' 的拼贴图已保存到: {output_path}") + else: + logging.error(f"使用样式 '{style}' 创建拼贴图失败") + + logging.info(f"总共成功创建了 {len(collages_2)} 张拼贴图 (使用 ImageCollageCreator)") + + # 比较两种方法 + logging.info("===== 拼贴图创建测试完成 =====") + logging.info(f"方法 1 (process_directory): {len(collages_1)} 张拼贴图") + logging.info(f"方法 2 (直接使用 ImageCollageCreator): {len(collages_2)} 张拼贴图") + if __name__ == "__main__": main() diff --git a/poster_gen_config.json b/poster_gen_config.json index 862f45c..c313917 100644 --- a/poster_gen_config.json +++ b/poster_gen_config.json @@ -46,7 +46,7 @@ "content_temperature": 0.3, "content_top_p": 0.4, "content_presence_penalty": 1.5, - "request_timeout": 30, + "request_timeout": 120, "max_retries": 3, "description_filename": "description.txt", "output_collage_subdir": "collage_img", diff --git a/utils/__pycache__/tweet_generator.cpython-312.pyc b/utils/__pycache__/tweet_generator.cpython-312.pyc index ff707a9..c7fa4ab 100644 Binary files a/utils/__pycache__/tweet_generator.cpython-312.pyc and b/utils/__pycache__/tweet_generator.cpython-312.pyc differ diff --git a/utils/tweet_generator.py b/utils/tweet_generator.py index a5f26aa..7a25fc5 100644 --- a/utils/tweet_generator.py +++ b/utils/tweet_generator.py @@ -572,16 +572,28 @@ def generate_posters_for_topic(topic_item: dict, # --- 使用 OutputHandler 保存 Poster Config --- output_handler.handle_poster_configs(run_id, topic_index, poster_text_configs_raw) - # --- 结束使用 Handler 保存 --- + # --- 结束使用 Handler 保存 --- + + # 打印原始配置数据以进行调试 + logging.info(f"生成的海报配置数据: {poster_text_configs_raw}") - poster_config_summary = core_posterGen.PosterConfig(poster_text_configs_raw) + # 直接使用配置数据,避免通过文件读取 + if isinstance(poster_text_configs_raw, list): + poster_configs = poster_text_configs_raw + logging.info(f"直接使用生成的配置列表,包含 {len(poster_configs)} 个配置项") + else: + # 如果不是列表,尝试转换或使用PosterConfig类解析 + logging.info("生成的配置数据不是列表,使用PosterConfig类进行处理") + poster_config_summary = core_posterGen.PosterConfig(poster_text_configs_raw) + poster_configs = poster_config_summary.get_config() except Exception as e: logging.exception("Error running ContentGenerator or parsing poster configs:") traceback.print_exc() return False # Poster Generation Loop for each variant - poster_num = variants + poster_num = min(variants, len(poster_configs)) if isinstance(poster_configs, list) else variants + logging.info(f"计划生成 {poster_num} 个海报变体") any_poster_attempted = False for j_index in range(poster_num): @@ -591,7 +603,15 @@ def generate_posters_for_topic(topic_item: dict, collage_img = None # To store the generated collage PIL Image poster_img = None # To store the final poster PIL Image try: - poster_config = poster_config_summary.get_config_by_index(j_index) + # 获取当前变体的配置 + if isinstance(poster_configs, list) and j_index < len(poster_configs): + poster_config = poster_configs[j_index] + logging.info(f"使用配置数据项 {j_index+1}: {poster_config}") + else: + # 回退方案:使用PosterConfig类 + poster_config = poster_config_summary.get_config_by_index(j_index) + logging.info(f"使用PosterConfig类获取配置项 {j_index+1}") + if not poster_config: logging.warning(f"Warning: Could not get poster config for index {j_index}. Skipping.") continue @@ -627,9 +647,16 @@ def generate_posters_for_topic(topic_item: dict, } texts = poster_config.get('texts', []) if texts: - text_data["additional_texts"].append({"text": texts[0], "position": "bottom", "size_factor": 0.5}) - if len(texts) > 1 and random.random() < text_possibility: - text_data["additional_texts"].append({"text": texts[1], "position": "bottom", "size_factor": 0.5}) + # 确保文本不为空 + if texts[0]: + text_data["additional_texts"].append({"text": texts[0], "position": "bottom", "size_factor": 0.5}) + + # 添加第二个文本(如果有并且满足随机条件) + if len(texts) > 1 and texts[1] and random.random() < text_possibility: + text_data["additional_texts"].append({"text": texts[1], "position": "bottom", "size_factor": 0.5}) + + # 打印要发送的文本数据 + logging.info(f"文本数据: {text_data}") # 调用修改后的 create_poster, 接收 PIL Image poster_img = poster_gen_instance.create_poster(collage_img, text_data)