#!/usr/bin/env python # -*- coding: utf-8 -*- import os import json import logging import traceback from datetime import datetime import sys sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from core.ai_agent import AI_Agent class ContentGenerator: """ 海报文本内容生成器 使用AI_Agent代替直接管理OpenAI客户端,简化代码结构 """ def __init__(self, output_dir="/root/autodl-tmp/poster_generate_result", model_name="qwenQWQ", base_url="http://localhost:8000/v1", api_key="EMPTY", temperature=0.7, top_p=0.8, presence_penalty=1.2): """ 初始化内容生成器 参数: output_dir: 输出结果保存目录 temperature: 生成温度参数 top_p: top_p参数 presence_penalty: 惩罚参数 """ self.output_dir = output_dir self.temperature = temperature self.top_p = top_p self.presence_penalty = presence_penalty self.add_description = "" self.model_name = model_name self.base_url = base_url self.api_key = api_key # 设置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') self.logger = logging.getLogger(__name__) def load_infomation(self, info_directory_path): """ 加载额外描述文件 参数: info_directory_path: 信息目录路径列表 """ self.add_description = "" # 重置描述文本 for path in info_directory_path: try: with open(path, "r", encoding="utf-8") as f: self.add_description += f.read() self.logger.info(f"成功加载描述文件: {path}") except Exception as e: self.logger.warning(f"加载描述文件失败: {path}, 错误: {e}") self.add_description = "" def split_content(self, content): """ 分割结果, 返回去除 ```json ```的json内容 参数: content: 需要分割的内容 返回: 分割后的json内容 """ try: # 记录原始内容的前200个字符(用于调试) self.logger.debug(f"解析内容,原始内容前200字符: {content[:200]}") # 首先尝试直接解析整个内容,以防已经是干净的 JSON try: parsed_data = json.loads(content) # 验证解析后的数据格式 if isinstance(parsed_data, list): # 如果是列表,验证每个元素是否符合预期结构 for item in parsed_data: if isinstance(item, dict) and ('main_title' in item or 'texts' in item): # 至少有一个元素符合海报配置结构 self.logger.info("成功直接解析为JSON格式列表,符合预期结构") return parsed_data # 如果到这里,说明列表内没有符合结构的元素 if len(parsed_data) > 0 and isinstance(parsed_data[0], str): self.logger.warning(f"解析到JSON列表,但内容是字符串列表: {parsed_data}") # 将字符串列表返回供后续修复 return parsed_data self.logger.warning("解析到JSON列表,但结构不符合预期") elif isinstance(parsed_data, dict) and ('main_title' in parsed_data or 'texts' in parsed_data): # 单个字典结构符合预期 self.logger.info("成功直接解析为JSON字典,符合预期结构") return parsed_data # 如果结构不符合预期,记录但仍返回解析结果,交给后续函数修复 self.logger.warning(f"解析到JSON,但结构不完全符合预期: {parsed_data}") return parsed_data except json.JSONDecodeError: # 不是完整有效的JSON,继续尝试提取 self.logger.debug("直接JSON解析失败,尝试提取结构化内容") # 常规模式:查找 ```json 和 ``` 之间的内容 if "```json" in content: json_str = content.split("```json")[1].split("```")[0].strip() try: parsed_json = json.loads(json_str) self.logger.info("成功从```json```代码块提取JSON") return parsed_json except json.JSONDecodeError as e: self.logger.warning(f"从```json```提取的内容解析失败: {e}, 尝试其他方法") # 备用模式1:查找连续的 [ 开头和 ] 结尾的部分 import re json_pattern = r'(\[(?:\s*\{.*?\}\s*,?)+\s*\])' # 更严格的模式,要求[]内至少有一个{}对象 json_matches = re.findall(json_pattern, content, re.DOTALL) for match in json_matches: try: result = json.loads(match) if isinstance(result, list) and len(result) > 0: # 验证结构 for item in result: if isinstance(item, dict) and ('main_title' in item or 'texts' in item): self.logger.info("成功从正则表达式提取JSON数组") return result self.logger.warning("从正则表达式提取的JSON数组不符合预期结构") except Exception as e: self.logger.warning(f"解析正则匹配的内容失败: {e}") continue # 备用模式2:查找 [ 开头 和 ] 结尾,并尝试解析 content = content.strip() square_bracket_start = content.find('[') square_bracket_end = content.rfind(']') if square_bracket_start != -1 and square_bracket_end != -1 and square_bracket_end > square_bracket_start: potential_json = content[square_bracket_start:square_bracket_end + 1] try: result = json.loads(potential_json) if isinstance(result, list): # 检查列表内容 self.logger.info(f"成功从方括号内容提取列表: {result}") return result except Exception as e: self.logger.warning(f"尝试提取方括号内容失败: {e}") # 最后一种尝试:查找所有可能的 JSON 结构并尝试解析 json_structures = re.findall(r'({.*?})', content, re.DOTALL) if json_structures: items = [] for i, struct in enumerate(json_structures): try: item = json.loads(struct) # 验证结构包含预期字段 if isinstance(item, dict) and ('main_title' in item or 'texts' in item): items.append(item) except Exception as e: self.logger.warning(f"解析可能的JSON结构 {i+1} 失败: {e}") continue if items: self.logger.info(f"成功从文本中提取 {len(items)} 个JSON对象") return items # 如果以上所有方法都失败,尝试简单字符串处理 if "|" in content or "必打卡" in content or "性价比" in content: # 这可能是一个简单的标签字符串 self.logger.warning(f"无法提取标准JSON,但发现可能的标签字符串: {content}") return content.strip() # 都失败了,打印错误并引发异常 self.logger.error(f"无法解析内容,返回原始文本: {content[:200]}...") raise ValueError("无法从响应中提取有效的 JSON 格式") except Exception as e: self.logger.error(f"解析内容时出错: {e}") self.logger.debug(f"原始内容: {content[:200]}...") # 仅显示前200个字符 return content.strip() # 返回原始内容,让后续验证函数处理 def generate_posters(self, poster_num, content_data_list, system_prompt=None, api_url=None, model_name=None, api_key=None, timeout=60, max_retries=3): """ 生成海报内容 参数: poster_num: 海报数量 content_data_list: 内容数据列表(字典或字符串) system_prompt: 系统提示,默认为None则使用预设提示 api_url: API基础URL model_name: 使用的模型名称 api_key: API密钥 timeout: 请求超时时间 max_retries: 最大重试次数 返回: 生成的海报内容 """ # 构建默认系统提示词 if not system_prompt: system_prompt = """ 你是一名资深海报设计师,有丰富的爆款海报设计经验,你现在要为旅游景点做宣传,在小红书上发布大量宣传海报。你的主要工作目标有2个: 1、你要根据我给你的图片描述和笔记推文内容,设计图文匹配的海报。 2、为海报设计文案,文案的<第一个小标题>和<第二个小标题>之间你需要检查是否逻辑关系合理,你将通过先去生成<第二个小标题>关于景区亮点的部分,再去综合判断<第一个小标题>应该如何搭配组合更符合两个小标题的逻辑再生成<第一个小标题>。 其中,生成三类标题文案的通用性要求如下: 1、生成的<大标题>字数必须小于8个字符 2、生成的<第一个小标题>字数和<第二个小标题>字数,两者都必须小8个字符 3、标题和文案都应符合中国社会主义核心价值观 接下来先开始生成<大标题>部分,由于海报是用来宣传旅游景点,生成的海报<大标题>必须使用以下8种格式之一: ①地名+景点名(例如福建厦门鼓浪屿/厦门鼓浪屿); ②地名+景点名+plog; ③拿捏+地名+景点名; ④地名+景点名+攻略; ⑤速通+地名+景点名 ⑥推荐!+地名+景点名 ⑦勇闯!+地名+景点名 ⑧收藏!+地名+景点名 你需要随机挑选一种格式生成对应景点的文案,但是格式除了上面8种不可以有其他任何格式;同时尽量保证每一种格式出现的频率均衡。 接下来先去生成<第二个小标题>,<第二个小标题>文案的创作必须遵循以下原则: 请根据笔记内容和图片识别,用极简的文字概括这篇笔记和图片中景点的特色亮点,其中你可以参考以下词汇进行创作,这段文案字数控制6-8字符以内; 特色亮点可能会出现的词汇不完全举例:非遗、古建、绝佳山水、祈福圣地、研学圣地、解压天堂、中国小瑞士、秘境竹筏游等等类型词汇 接下来再去生成<第一个小标题>,<第一个小标题>文案的创作必须遵循以下原则: 这部分文案创作公式有5种,分别为: ①<受众人群画像>+<痛点词> ②<受众人群画像> ③<痛点词> ④<受众人群画像>+ | +<痛点词> ⑤<痛点词>+ | +<受众人群画像> 请你根据实际笔记内容,结合这部分文案创作公式,需要结合<受众人群画像>和<痛点词>时,必须根据<第二个小标题>的景点特征和所对应的完整笔记推文内容主旨,特征挑选对应<受众人群画像>和<痛点词>。 我给你提供受众人群画像库和痛点词库如下: 1、受众人群画像库:情侣党、亲子游、合家游、银发族、亲子研学、学生党、打工人、周边游、本地人、穷游党、性价比、户外人、美食党、出片 2、痛点词库:3天2夜、必去、看了都哭了、不能错过、一定要来、问爆了、超全攻略、必打卡、强推、懒人攻略、必游榜、小众打卡、狂喜等等。 你需要为每个请求至少生成{poster_num}个海报设计。请使用JSON格式输出结果,结构如下: [ { "index": 1, "main_title": "主标题内容", "texts": ["第一个小标题", "第二个小标题"] }, { "index": 2, "main_title": "主标题内容", "texts": ["第一个小标题", "第二个小标题"] }, // ... 更多海报 ] 确保生成的数量与用户要求的数量一致。只生成上述JSON格式内容,不要有其他任何额外内容。 """ # 提取内容文本(如果是列表内容数据) tweet_content = "" if isinstance(content_data_list, list): for item in content_data_list: if isinstance(item, dict): title = item.get('title', '') content = item.get('content', '') tweet_content += f"\n{title}\n\n\n{content}\n\n\n" elif isinstance(item, str): tweet_content += item + "\n\n" elif isinstance(content_data_list, str): tweet_content = content_data_list # 构建用户提示 if self.add_description: user_content = f""" 以下是需要你处理的信息: 关于景点的描述: {self.add_description} 推文内容: {tweet_content} 请根据这些信息,生成{poster_num}个海报文案配置,以JSON数组格式返回。 """ else: user_content = f""" 以下是需要你处理的推文内容: {tweet_content} 请根据这些信息,生成{poster_num}个海报文案配置,以JSON数组格式返回。 """ self.logger.info(f"正在生成{poster_num}个海报文案配置") # 创建AI_Agent实例 ai_agent = AI_Agent( self.base_url, self.model_name, self.api_key, timeout=timeout, max_retries=max_retries, stream_chunk_timeout=30 # 流式块超时时间 ) full_response = "" try: # 使用AI_Agent的non-streaming方法 self.logger.info(f"调用AI生成海报配置,模型: {self.model_name}") full_response, tokens, time_cost = ai_agent.work( system_prompt, user_content, "", # 历史消息(空) self.temperature, self.top_p, self.presence_penalty ) self.logger.info(f"AI生成完成,耗时: {time_cost:.2f}s, 预估令牌数: {tokens}") if not full_response: self.logger.warning("AI返回空响应,使用备用内容") full_response = self._generate_fallback_content(poster_num) except Exception as e: self.logger.exception(f"AI生成过程发生错误: {e}") full_response = self._generate_fallback_content(poster_num) finally: # 确保关闭AI Agent ai_agent.close() return full_response def _generate_fallback_content(self, poster_num): """生成备用内容,当API调用失败时使用""" self.logger.info("生成备用内容") default_configs = [] for i in range(poster_num): default_configs.append({ "index": i + 1, "main_title": "", "texts": ["", ""] }) return json.dumps(default_configs, ensure_ascii=False) def save_result(self, full_response, custom_output_dir=None): """ 保存生成结果到文件 参数: full_response: 生成的完整响应内容 custom_output_dir: 自定义输出目录(可选) 返回: 结果文件路径 """ # 生成时间戳 date_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") output_dir = custom_output_dir or self.output_dir try: # 解析内容为JSON格式 parsed_data = self.split_content(full_response) # 验证内容格式并修复 validated_data = self._validate_and_fix_data(parsed_data) # 创建结果文件路径 result_path = os.path.join(output_dir, f"{date_time}.json") os.makedirs(os.path.dirname(result_path), exist_ok=True) # 保存结果到文件 with open(result_path, "w", encoding="utf-8") as f: json.dump(validated_data, f, ensure_ascii=False, indent=4) self.logger.info(f"结果已保存到: {result_path}") return result_path except Exception as e: self.logger.error(f"保存结果到文件时出错: {e}") # 尝试创建一个简单的备用配置 fallback_data = [{"main_title": "", "texts": ["", ""], "index": 1}] # 保存备用数据 result_path = os.path.join(output_dir, f"{date_time}_fallback.json") os.makedirs(os.path.dirname(result_path), exist_ok=True) with open(result_path, "w", encoding="utf-8") as f: json.dump(fallback_data, f, ensure_ascii=False, indent=4) self.logger.info(f"出错后已保存备用数据到: {result_path}") return result_path def _validate_and_fix_data(self, data): """ 验证并修复数据格式,确保符合预期结构 参数: data: 需要验证的数据 返回: 修复后的数据 """ fixed_data = [] # 记录原始数据格式信息 self.logger.info(f"验证和修复数据,原始数据类型: {type(data)}") if isinstance(data, list): self.logger.info(f"原始数据是列表,长度: {len(data)}") if len(data) > 0: self.logger.info(f"第一个元素类型: {type(data[0])}") elif isinstance(data, str): self.logger.info(f"原始数据是字符串: {data[:100]}") else: self.logger.info(f"原始数据是其他类型: {data}") # 如果数据是列表 if isinstance(data, list): for i, item in enumerate(data): # 检查项目是否为字典 if isinstance(item, dict): # 确保必需字段存在 fixed_item = { "index": item.get("index", i + 1), "main_title": item.get("main_title", ""), "texts": item.get("texts", ["", ""]) } # 确保texts是列表格式 if not isinstance(fixed_item["texts"], list): if isinstance(fixed_item["texts"], str): fixed_item["texts"] = [fixed_item["texts"], ""] else: fixed_item["texts"] = ["", ""] # 限制texts最多包含两个元素 if len(fixed_item["texts"]) > 2: fixed_item["texts"] = fixed_item["texts"][:2] elif len(fixed_item["texts"]) < 2: while len(fixed_item["texts"]) < 2: fixed_item["texts"].append("") fixed_data.append(fixed_item) # 如果项目是字符串(可能是错误格式的texts值) elif isinstance(item, str): self.logger.warning(f"配置项 {i+1} 是字符串格式: '{item}',将转换为标准格式") # 尝试解析字符串格式,例如"性价比|必打卡" texts = [] if "|" in item: texts = item.split("|") else: texts = [item, ""] fixed_item = { "index": i + 1, "main_title": "", "texts": texts } fixed_data.append(fixed_item) else: self.logger.warning(f"配置项 {i+1} 格式不支持: {type(item)},将使用默认值") fixed_data.append({ "index": i + 1, "main_title": "", "texts": ["", ""] }) # 如果数据是字典 elif isinstance(data, dict): fixed_item = { "index": data.get("index", 1), "main_title": data.get("main_title", ""), "texts": data.get("texts", ["", ""]) } # 确保texts是列表格式 if not isinstance(fixed_item["texts"], list): if isinstance(fixed_item["texts"], str): fixed_item["texts"] = [fixed_item["texts"], ""] else: fixed_item["texts"] = ["", ""] # 限制texts最多包含两个元素 if len(fixed_item["texts"]) > 2: fixed_item["texts"] = fixed_item["texts"][:2] elif len(fixed_item["texts"]) < 2: while len(fixed_item["texts"]) < 2: fixed_item["texts"].append("") fixed_data.append(fixed_item) # 如果数据是字符串 elif isinstance(data, str): self.logger.warning(f"数据是字符串格式: '{data}',尝试转换为标准格式") # 尝试解析字符串格式,例如"性价比|必打卡" texts = [] if "|" in data: texts = data.split("|") else: texts = [data, ""] fixed_data.append({ "index": 1, "main_title": "", "texts": texts }) # 如果数据是其他格式 else: self.logger.warning(f"数据格式不支持: {type(data)},将使用默认值") fixed_data.append({ "index": 1, "main_title": "", "texts": ["", ""] }) # 确保至少有一个配置项 if not fixed_data: fixed_data.append({ "index": 1, "main_title": "", "texts": ["", ""] }) self.logger.info(f"修复后的数据: {fixed_data}") return fixed_data def run(self, info_directory, poster_num, content_data, system_prompt=None, api_url="http://localhost:8000/v1", model_name="qwenQWQ", api_key="EMPTY", timeout=120): """ 运行海报内容生成流程,并返回生成的配置数据。 参数: info_directory: 信息目录路径列表 (e.g., ['/path/to/description.txt']) poster_num: 需要生成的海报配置数量 content_data: 用于生成内容的文章内容(可以是字符串或字典列表) system_prompt: 系统提示词,默认为None使用内置提示词 api_url: API基础URL model_name: 使用的模型名称 api_key: API密钥 返回: list | dict | None: 生成的海报配置数据 (通常是列表),如果生成或解析失败则返回 None。 """ try: # 加载描述信息 self.load_infomation(info_directory) # 生成海报内容 self.logger.info(f"开始生成海报内容,数量: {poster_num}") full_response = self.generate_posters( poster_num, content_data, system_prompt, api_url, model_name, api_key, timeout=timeout, ) # 检查生成是否失败 if not isinstance(full_response, str) or not full_response.strip(): self.logger.error("海报内容生成失败或返回空响应") return None # 从原始响应字符串中提取JSON数据 result_data = self.split_content(full_response) # 验证并修复数据 fixed_data = self._validate_and_fix_data(result_data) self.logger.info(f"成功生成并修复海报配置数据,包含 {len(fixed_data) if isinstance(fixed_data, list) else 1} 个项目") return fixed_data except Exception as e: self.logger.exception(f"海报内容生成过程中发生错误: {e}") traceback.print_exc() # 失败后创建一个默认配置 self.logger.info("创建默认海报配置数据") default_configs = [] for i in range(poster_num): default_configs.append({ "index": i + 1, "main_title": "", "texts": ["", ""] }) return default_configs def set_temperature(self, temperature): """设置温度参数""" self.temperature = temperature def set_top_p(self, top_p): """设置top_p参数""" self.top_p = top_p def set_presence_penalty(self, presence_penalty): """设置存在惩罚参数""" self.presence_penalty = presence_penalty def set_model_para(self, temperature, top_p, presence_penalty): """一次性设置所有模型参数""" self.temperature = temperature self.top_p = top_p self.presence_penalty = presence_penalty