diff --git a/utils/__pycache__/content_generator.cpython-312.pyc b/utils/__pycache__/content_generator.cpython-312.pyc
index 23465af..6367136 100644
Binary files a/utils/__pycache__/content_generator.cpython-312.pyc and b/utils/__pycache__/content_generator.cpython-312.pyc differ
diff --git a/utils/__pycache__/content_judger.cpython-312.pyc b/utils/__pycache__/content_judger.cpython-312.pyc
index 3b99321..a00ffc6 100644
Binary files a/utils/__pycache__/content_judger.cpython-312.pyc and b/utils/__pycache__/content_judger.cpython-312.pyc differ
diff --git a/utils/__pycache__/output_handler.cpython-312.pyc b/utils/__pycache__/output_handler.cpython-312.pyc
index 85bb16d..7720a1e 100644
Binary files a/utils/__pycache__/output_handler.cpython-312.pyc and b/utils/__pycache__/output_handler.cpython-312.pyc differ
diff --git a/utils/__pycache__/tweet_generator.cpython-312.pyc b/utils/__pycache__/tweet_generator.cpython-312.pyc
index f5acb02..25c416b 100644
Binary files a/utils/__pycache__/tweet_generator.cpython-312.pyc and b/utils/__pycache__/tweet_generator.cpython-312.pyc differ
diff --git a/utils/content_generator.py b/utils/content_generator.py
index 22beb2f..e2a9b93 100644
--- a/utils/content_generator.py
+++ b/utils/content_generator.py
@@ -193,108 +193,66 @@ class ContentGenerator:
self.logger.debug(f"原始内容: {content[:200]}...") # 仅显示前200个字符
return content.strip() # 返回原始内容,让后续验证函数处理
- def generate_posters(self,
- poster_num,
- content_data_list,
- system_prompt=None,
- api_url=None,
- model_name=None,
- api_key=None,
- timeout=60,
- max_retries=3):
+ def _preprocess_for_json(self, text):
+ """预处理文本,将换行符转换为\\n形式,保证JSON安全"""
+ if not isinstance(text, str):
+ return text
+ # 将所有实际换行符替换为\\n字符串
+ return text.replace('\n', '\\n').replace('\r', '\\r')
+
+ def generate_posters(self, poster_num, content_data_list, system_prompt=None,
+ api_url=None, model_name=None, api_key=None, timeout=120, max_retries=3):
"""
- 生成海报内容
-
- 参数:
- poster_num: 海报数量
- content_data_list: 内容数据列表(字典或字符串)
- system_prompt: 系统提示,默认为None则使用预设提示
- api_url: API基础URL
- model_name: 使用的模型名称
- api_key: API密钥
- timeout: 请求超时时间
- max_retries: 最大重试次数
+ 生成海报配置
- 返回:
- 生成的海报内容
+ Args:
+ poster_num: 生成的海报数量
+ content_data_list: 内容数据列表
+ system_prompt: 系统提示词(可选)
+ api_url: API基础URL(可选)
+ model_name: 模型名称(可选)
+ api_key: API密钥(可选)
+
+ Returns:
+ str: 生成的配置JSON字符串
"""
- # 构建默认系统提示词
- if not system_prompt:
- system_prompt = """
- 你是一名资深海报设计师,有丰富的爆款海报设计经验,你现在要为旅游景点做宣传,在小红书上发布大量宣传海报。你的主要工作目标有2个:
- 1、你要根据我给你的图片描述和笔记推文内容,设计图文匹配的海报。
- 2、为海报设计文案,文案的<第一个小标题>和<第二个小标题>之间你需要检查是否逻辑关系合理,你将通过先去生成<第二个小标题>关于景区亮点的部分,再去综合判断<第一个小标题>应该如何搭配组合更符合两个小标题的逻辑再生成<第一个小标题>。
-
- 其中,生成三类标题文案的通用性要求如下:
- 1、生成的<大标题>字数必须小于8个字符
- 2、生成的<第一个小标题>字数和<第二个小标题>字数,两者都必须小8个字符
- 3、标题和文案都应符合中国社会主义核心价值观
-
- 接下来先开始生成<大标题>部分,由于海报是用来宣传旅游景点,生成的海报<大标题>必须使用以下8种格式之一:
- ①地名+景点名(例如福建厦门鼓浪屿/厦门鼓浪屿);
- ②地名+景点名+plog;
- ③拿捏+地名+景点名;
- ④地名+景点名+攻略;
- ⑤速通+地名+景点名
- ⑥推荐!+地名+景点名
- ⑦勇闯!+地名+景点名
- ⑧收藏!+地名+景点名
- 你需要随机挑选一种格式生成对应景点的文案,但是格式除了上面8种不可以有其他任何格式;同时尽量保证每一种格式出现的频率均衡。
- 接下来先去生成<第二个小标题>,<第二个小标题>文案的创作必须遵循以下原则:
- 请根据笔记内容和图片识别,用极简的文字概括这篇笔记和图片中景点的特色亮点,其中你可以参考以下词汇进行创作,这段文案字数控制6-8字符以内;
-
- 特色亮点可能会出现的词汇不完全举例:非遗、古建、绝佳山水、祈福圣地、研学圣地、解压天堂、中国小瑞士、秘境竹筏游等等类型词汇
-
- 接下来再去生成<第一个小标题>,<第一个小标题>文案的创作必须遵循以下原则:
- 这部分文案创作公式有5种,分别为:
- ①<受众人群画像>+<痛点词>
- ②<受众人群画像>
- ③<痛点词>
- ④<受众人群画像>+ | +<痛点词>
- ⑤<痛点词>+ | +<受众人群画像>
- 请你根据实际笔记内容,结合这部分文案创作公式,需要结合<受众人群画像>和<痛点词>时,必须根据<第二个小标题>的景点特征和所对应的完整笔记推文内容主旨,特征挑选对应<受众人群画像>和<痛点词>。
-
- 我给你提供受众人群画像库和痛点词库如下:
- 1、受众人群画像库:情侣党、亲子游、合家游、银发族、亲子研学、学生党、打工人、周边游、本地人、穷游党、性价比、户外人、美食党、出片
- 2、痛点词库:3天2夜、必去、看了都哭了、不能错过、一定要来、问爆了、超全攻略、必打卡、强推、懒人攻略、必游榜、小众打卡、狂喜等等。
-
- 你需要为每个请求至少生成{poster_num}个海报设计。请使用JSON格式输出结果,结构如下:
- [
- {
- "index": 1,
- "main_title": "主标题内容",
- "texts": ["第一个小标题", "第二个小标题"]
- },
- {
- "index": 2,
- "main_title": "主标题内容",
- "texts": ["第一个小标题", "第二个小标题"]
- },
- // ... 更多海报
- ]
- 确保生成的数量与用户要求的数量一致。只生成上述JSON格式内容,不要有其他任何额外内容。
- """
+ # 更新API设置
+ if api_url:
+ self.base_url = api_url
+ if model_name:
+ self.model_name = model_name
+ if api_key:
+ self.api_key = api_key
+
+ # 使用系统提示或默认提示
+ if system_prompt:
+ self.system_prompt = system_prompt
+ elif not self.system_prompt:
+ self.system_prompt = """你是一名专业的旅游景点海报文案创作专家。你的任务是根据提供的旅游景点信息和推文内容,生成海报文案配置。你的回复必须是一个JSON数组,每一项表示一个海报配置,包含'index'、'main_title'和'texts'三个字段,其中'texts'是一个字符串数组。海报文案要简洁有力,突出景点特色和吸引力。"""
# 提取内容文本(如果是列表内容数据)
tweet_content = ""
if isinstance(content_data_list, list):
for item in content_data_list:
if isinstance(item, dict):
- title = item.get('title', '')
- content = item.get('content', '')
+ # 对标题和内容进行预处理,替换换行符
+ title = self._preprocess_for_json(item.get('title', ''))
+ content = self._preprocess_for_json(item.get('content', ''))
tweet_content += f"
\n{title}\n\n\n{content}\n\n\n"
elif isinstance(item, str):
- tweet_content += item + "\n\n"
+ tweet_content += self._preprocess_for_json(item) + "\n\n"
elif isinstance(content_data_list, str):
- tweet_content = content_data_list
+ tweet_content = self._preprocess_for_json(content_data_list)
# 构建用户提示
if self.add_description:
+ # 预处理景点描述
+ processed_description = self._preprocess_for_json(self.add_description)
user_content = f"""
以下是需要你处理的信息:
关于景点的描述:
- {self.add_description}
+ {processed_description}
推文内容:
{tweet_content}
@@ -326,7 +284,7 @@ class ContentGenerator:
# 使用AI_Agent的non-streaming方法
self.logger.info(f"调用AI生成海报配置,模型: {self.model_name}")
full_response, tokens, time_cost = ai_agent.work(
- system_prompt,
+ self.system_prompt,
user_content,
"", # 历史消息(空)
self.temperature,
@@ -409,118 +367,105 @@ class ContentGenerator:
def _validate_and_fix_data(self, data):
"""
- 验证并修复数据格式,确保符合预期结构
-
- 参数:
+ 验证并修复从AI返回的数据,确保其符合期望的结构
+
+ Args:
data: 需要验证的数据
-
- 返回:
- 修复后的数据
+
+ Returns:
+ list: 修复后的数据列表
"""
fixed_data = []
+ self.logger.info(f"验证并修复数据: {type(data)}")
- # 记录原始数据格式信息
- self.logger.info(f"验证和修复数据,原始数据类型: {type(data)}")
- if isinstance(data, list):
- self.logger.info(f"原始数据是列表,长度: {len(data)}")
- if len(data) > 0:
- self.logger.info(f"第一个元素类型: {type(data[0])}")
- elif isinstance(data, str):
- self.logger.info(f"原始数据是字符串: {data[:100]}")
- else:
- self.logger.info(f"原始数据是其他类型: {data}")
+ # 尝试处理字符串类型 (通常是JSON字符串)
+ if isinstance(data, str):
+ try:
+ # 尝试将字符串解析为JSON对象
+ parsed_data = json.loads(data)
+ # 递归调用本函数处理解析后的数据
+ return self._validate_and_fix_data(parsed_data)
+ except json.JSONDecodeError as e:
+ self.logger.warning(f"JSON解析失败: {e}")
+ # 可以选择尝试清理和再次解析
+ try:
+ # 寻找字符串中第一个 [ 和最后一个 ] 之间的内容
+ start_idx = data.find('[')
+ end_idx = data.rfind(']')
+ if start_idx >= 0 and end_idx > start_idx:
+ json_part = data[start_idx:end_idx+1]
+ self.logger.info(f"尝试从字符串中提取JSON部分: {json_part[:100]}...")
+ parsed_data = json.loads(json_part)
+ return self._validate_and_fix_data(parsed_data)
+ except:
+ self.logger.warning("无法从字符串中提取有效的JSON部分")
+ fixed_data.append({
+ "index": 1,
+ "main_title": self._preprocess_for_json("默认标题"), # 应用预处理
+ "texts": [self._preprocess_for_json("默认副标题1"), self._preprocess_for_json("默认副标题2")] # 应用预处理
+ })
- # 如果数据是列表
- if isinstance(data, list):
- for i, item in enumerate(data):
- # 检查项目是否为字典
+ # 处理列表类型
+ elif isinstance(data, list):
+ for idx, item in enumerate(data):
+ # 如果是字典,检查必须字段
if isinstance(item, dict):
- # 确保必需字段存在
- fixed_item = {
- "index": item.get("index", i + 1),
- "main_title": item.get("main_title", ""),
- "texts": item.get("texts", ["", ""])
- }
+ fixed_item = {}
+ # 设置索引
+ fixed_item["index"] = item.get("index", idx + 1)
- # 确保texts是列表格式
- if not isinstance(fixed_item["texts"], list):
- if isinstance(fixed_item["texts"], str):
- fixed_item["texts"] = [fixed_item["texts"], ""]
- else:
- fixed_item["texts"] = ["", ""]
-
- # 限制texts最多包含两个元素
- if len(fixed_item["texts"]) > 2:
- fixed_item["texts"] = fixed_item["texts"][:2]
- elif len(fixed_item["texts"]) < 2:
+ # 处理主标题
+ if "main_title" in item and item["main_title"]:
+ # 应用预处理,确保所有换行符被正确转义
+ fixed_item["main_title"] = self._preprocess_for_json(item["main_title"])
+ else:
+ fixed_item["main_title"] = "默认标题"
+
+ # 处理文本列表
+ if "texts" in item and isinstance(item["texts"], list) and len(item["texts"]) > 0:
+ # 对文本列表中的每个元素应用预处理
+ fixed_item["texts"] = [self._preprocess_for_json(text) if text else "" for text in item["texts"]]
+ # 确保至少有两个元素
while len(fixed_item["texts"]) < 2:
fixed_item["texts"].append("")
-
- fixed_data.append(fixed_item)
-
- # 如果项目是字符串(可能是错误格式的texts值)
- elif isinstance(item, str):
- self.logger.warning(f"配置项 {i+1} 是字符串格式: '{item}',将转换为标准格式")
-
- # 尝试解析字符串格式,例如"性价比|必打卡"
- texts = []
- if "|" in item:
- texts = item.split("|")
else:
- texts = [item, ""]
+ fixed_item["texts"] = ["默认副标题1", "默认副标题2"]
- fixed_item = {
- "index": i + 1,
- "main_title": "",
- "texts": texts
- }
fixed_data.append(fixed_item)
- else:
- self.logger.warning(f"配置项 {i+1} 格式不支持: {type(item)},将使用默认值")
+
+ # 如果是字符串,转换为默认格式
+ elif isinstance(item, str):
fixed_data.append({
- "index": i + 1,
- "main_title": "",
+ "index": idx + 1,
+ "main_title": self._preprocess_for_json(item), # 应用预处理
+ "texts": ["", ""]
+ })
+
+ # 其他类型,使用默认值
+ else:
+ fixed_data.append({
+ "index": idx + 1,
+ "main_title": "默认标题",
"texts": ["", ""]
})
- # 如果数据是字典
+ # 处理字典类型 (单个配置项)
elif isinstance(data, dict):
- fixed_item = {
- "index": data.get("index", 1),
- "main_title": data.get("main_title", ""),
- "texts": data.get("texts", ["", ""])
- }
+ # 处理主标题
+ main_title = self._preprocess_for_json(data.get("main_title", "默认标题")) # 应用预处理
- # 确保texts是列表格式
- if not isinstance(fixed_item["texts"], list):
- if isinstance(fixed_item["texts"], str):
- fixed_item["texts"] = [fixed_item["texts"], ""]
- else:
- fixed_item["texts"] = ["", ""]
-
- # 限制texts最多包含两个元素
- if len(fixed_item["texts"]) > 2:
- fixed_item["texts"] = fixed_item["texts"][:2]
- elif len(fixed_item["texts"]) < 2:
- while len(fixed_item["texts"]) < 2:
- fixed_item["texts"].append("")
-
- fixed_data.append(fixed_item)
-
- # 如果数据是字符串
- elif isinstance(data, str):
- self.logger.warning(f"数据是字符串格式: '{data}',尝试转换为标准格式")
-
- # 尝试解析字符串格式,例如"性价比|必打卡"
+ # 处理文本列表
texts = []
- if "|" in data:
- texts = data.split("|")
- else:
- texts = [data, ""]
+ if "texts" in data and isinstance(data["texts"], list):
+ texts = [self._preprocess_for_json(text) if text else "" for text in data["texts"]] # 应用预处理
+
+ # 确保文本列表至少有两个元素
+ while len(texts) < 2:
+ texts.append("")
fixed_data.append({
- "index": 1,
- "main_title": "",
+ "index": data.get("index", 1),
+ "main_title": main_title,
"texts": texts
})
diff --git a/utils/content_judger.py b/utils/content_judger.py
index d1e1e06..feab470 100644
--- a/utils/content_judger.py
+++ b/utils/content_judger.py
@@ -4,13 +4,14 @@
内容审核模块:检查生成的内容是否符合产品资料要求并提供修改建议
"""
-import simplejson as json
+import json
import logging
-import re
import os
import time
import traceback
import sys
+import base64
+import re
sys.path.append('/root/autodl-tmp/TravelContentCreator') # 添加项目根目录
from core.ai_agent import AI_Agent
@@ -107,18 +108,8 @@ class ContentJudger:
}
输出结果:
-{ "analysis" : "
- 1、观察文案标题和内容,可以看出此文案主要面向亲子出游人群,因此修改后的文案也应该围绕亲子出游这一主题。
- 2、文章标题字数为28个字,超过19个字,因此属于不符内容。由于要求中提到尽量保留emoji,并且标题中数字后面的"元"字应删去,所以修改为:五一遛娃👶必囤!喜来登1088景观房
- 3、产品资料中未提及儿童乐园开放时间和儿童乐园配置,但文案中提到儿童乐园10:00-20:00全程开放,滑梯/积木/绘本一应俱全,因此属于不符内容。应修改为:儿童乐园:免费儿童乐园和丰富的游乐设施,让孩子们可以尽情玩耍。
- 4、产品材料中未提及户外泳池开放时间和消毒频次,但文案中提到户外泳池:9:00-18:00恒温开放(五一期间每日消毒3次),因此属于不符内容。应修改为:户外泳池:酒店配有户外无边泳池,供大人小孩一同享受清凉时光。
- 5、产品材料中未提及健身房开放时间与具体细节,但文案中提到健身房:8:00-22:00配备亲子瑜伽课程(需提前预约),因此属于不符内容。应修改为:健身房:酒店提供免费健身中心,方便您和家人一起强身健体。
- 6、产品材料中未提及餐厅硬件配置,但文案中提到自助晚餐隐藏彩蛋:儿童餐区设独立洗手台+热食保温柜,因此属于虚构内容。应修改为:自助餐厅:供应鲜美海鲜、精美甜品等任君选择,大人小孩都爱吃!
- 7、产品材料中未提及酒店安保措施,但文案中提到安全保障:全区域监控+24小时安保巡逻,因此属于不符内容。应修改为:安全保障:酒店设有完善的监控系统和安保措施,无需担心您与家人的安全。
- 8、产品材料中未提及房内配有加厚床垫/卡通洗漱杯/尿布台(无需额外购买),因此属于不符内容。应回顾产品资料中关于房内配置的内容,修改为:房内配置:55英寸超大纯平电视+独立的浴缸+超大的落地玻璃窗,尽览蕉门河风景,尽享亲子度假时光。
- 9、产品材料中未提及五一专属加码,但文案中提到5月1-5日期间入住,凭房卡可免费领取儿童防晒冰袖+湿巾礼包,因此属于不符内容。应回顾产品资料,找到现有文案未提及的产品特色,修改为:套餐专属福利:1、豪华客房一间一晚(周一至四只开放双床房) 2、2大1小自助早晚餐 3、赠送2大1小水鸟世界门票(酒店前台领取),无需额外购买
- 10、产品资料中未提及水鸟世界门票领取有时间限制,但文案中提到水鸟世界门票需提前1小时至前台领取纸质票,因此属于不符内容。应修改为:酒店前台领取水鸟世界纸质门票
- 综合以上分析结果,将修改应用到原文案中,得到修改后的文案。",
+{
+ "analysis" : "1、观察文案标题和内容,可以看出此文案主要面向亲子出游人群,因此修改后的文案也应该围绕亲子出游这一主题。\n2、文章标题字数为28个字,超过19个字,因此属于不符内容。由于要求中提到尽量保留emoji,并且标题中数字后面的"元"字应删去,所以修改为:五一遛娃👶必囤!喜来登1088景观房\n3、产品资料中未提及儿童乐园开放时间和儿童乐园配置,但文案中提到儿童乐园10:00-20:00全程开放,滑梯/积木/绘本一应俱全,因此属于不符内容。应修改为:儿童乐园:免费儿童乐园和丰富的游乐设施,让孩子们可以尽情玩耍。\n4、产品材料中未提及户外泳池开放时间和消毒频次,但文案中提到户外泳池:9:00-18:00恒温开放(五一期间每日消毒3次),因此属于不符内容。应修改为:户外泳池:酒店配有户外无边泳池,供大人小孩一同享受清凉时光。 \n5、产品材料中未提及健身房开放时间与具体细节,但文案中提到健身房:8:00-22:00配备亲子瑜伽课程(需提前预约),因此属于不符内容。应修改为:健身房:酒店提供免费健身中心,方便您和家人一起强身健体。\n6、产品材料中未提及餐厅硬件配置,但文案中提到自助晚餐隐藏彩蛋:儿童餐区设独立洗手台+热食保温柜,因此属于虚构内容。应修改为:自助餐厅:供应鲜美海鲜、精美甜品等任君选择,大人小孩都爱吃!\n7、产品材料中未提及酒店安保措施,但文案中提到安全保障:全区域监控+24小时安保巡逻,因此属于不符内容。应修改为:安全保障:酒店设有完善的监控系统和安保措施,无需担心您与家人的安全。\n8、产品材料中未提及房内配有加厚床垫/卡通洗漱杯/尿布台(无需额外购买),因此属于不符内容。应回顾产品资料中关于房内配置的内容,修改为:房内配置:55英寸超大纯平电视+独立的浴缸+超大的落地玻璃窗,尽览蕉门河风景,尽享亲子度假时光。\n9、产品材料中未提及五一专属加码,但文案中提到5月1-5日期间入住,凭房卡可免费领取儿童防晒冰袖+湿巾礼包,因此属于不符内容。应回顾产品资料,找到现有文案未提及的产品特色,修改为:套餐专属福利:1、豪华客房一间一晚(周一至四只开放双床房) 2、2大1小自助早晚餐 3、赠送2大1小水鸟世界门票(酒店前台领取),无需额外购买。\n10、产品资料中未提及水鸟世界门票领取有时间限制,但文案中提到水鸟世界门票需提前1小时至前台领取纸质票,因此属于不符内容。应修改为:酒店前台领取水鸟世界纸质门票\n综合以上分析结果,将修改应用到原文案中,得到修改后的文案。",
"title": "五一遛娃👶必囤!喜来登1088景观房",
"content": "五一不想挤人潮?南沙这家酒店直接承包遛娃+度假双重快乐‼️\n地铁直达!2大1小1088r住景观房,含双早+自助晚餐+水鸟世界门票,儿童乐园/泳池/健身房全开放!\n🌟【遛娃刚需全配齐】\n✅ 儿童乐园:酒店设有免费儿童乐园,提供丰富的游乐设施,让孩子们尽情玩耍\n✅ 户外泳池:酒店配有户外无边泳池,供大人小孩一同享受清凉时光 \n✅ 健身房:酒店提供免费健身中心,适合家庭成员共同锻炼。\n\n📍【1小时玩转南沙】\n① 南沙天后宫(车程20分钟):穿汉服拍大片,听妈祖传说涨知识\n② 南沙湿地公园(40分钟):5月芦苇摇曳,带娃认鸟类+乘船探秘\n③ 十九涌海鲜街(45分钟):现捞现煮生猛海鲜,人均50r吃到撑 \n\n🍽️【家长友好细节】 \n• 自助餐厅:供应鲜美海鲜、精美甜品等任君选择,大人小孩都爱吃 \n• 房内配置:55英寸超大纯平电视+独立的浴缸+超大的落地玻璃窗,尽览蕉门河风景,尽享亲子度假时光 \n• 安全保障:酒店设有完善的监控系统和安保措施,全力保障您与家人的安全 \n\n🎁【套餐专属福利】\n1、豪华客房一间一晚(周一至四只开放双床房) \n2、2大1小自助早晚餐 \n3、赠送2大1小水鸟世界门票(酒店前台领取),无需额外购买 \n\n📌Tips: \n1. 周一至周四仅限双床房型,周五起可选大床房 \n2. 酒店前台领取水鸟世界纸质门票 \n3. 地铁四号线金洲站下车,打车15分钟直达酒店 \n\n这个五一,南沙喜来登让你躺着遛娃!不用长途跋涉,家门口就能玩出仪式感~\n"
}
@@ -163,125 +154,157 @@ class ContentJudger:
logging.error(f"从PromptManager获取系统提示词失败: {e}")
return False
- def judge_content(self, product_info, content, temperature=0.2, top_p=0.5, presence_penalty=0.0):
+ def _split_content(self, result):
"""
- 审核内容是否符合产品资料并提供修改建议
+ 参考tweet_generator的处理方式,解析AI返回的内容
Args:
- product_info: 产品资料信息字符串
- content_json: 需要审核的内容JSON对象或JSON字符串
- temperature: 温度参数,控制随机性
- top_p: 核采样参数
- presence_penalty: 存在惩罚参数
+ result: AI返回的原始结果
Returns:
- dict: 审核后的结果JSON,包含修改后的title和content以及judge_success状态
+ dict: 解析后的JSON数据
"""
+ try:
+ # 处理AI可能返回的思考部分
+ processed_result = result
+ if "" in result:
+ processed_result = result.split("")[1] # 取标签后的内容
+
+ # 直接尝试解析JSON
+ json_data = json.loads(processed_result)
+ json_data["error"] = False
+ json_data["judge_success"] = True
+ return json_data
+
+ except json.JSONDecodeError as json_err:
+ # JSON解析失败,记录错误并尝试更基本的处理方法
+ logging.warning(f"解析内容时出错: {json_err}, 尝试提取JSON部分")
+
+ try:
+ # 尝试找到JSON部分(从第一个{到最后一个})
+ json_start = processed_result.find('{')
+ json_end = processed_result.rfind('}') + 1
+
+ if json_start >= 0 and json_end > json_start:
+ json_str = processed_result[json_start:json_end]
+ json_data = json.loads(json_str)
+ json_data["error"] = False
+ json_data["judge_success"] = True
+ return json_data
+ except Exception as e:
+ logging.error(f"尝试提取JSON部分失败: {e}")
+
+ except Exception as e:
+ logging.error(f"解析内容时出错: {e}")
+
+ # 所有解析方法都失败,返回一个默认结果
+ return {
+ "title": "",
+ "content": "",
+ "error": True,
+ "judge_success": False,
+ "analysis": f"内容解析失败,错误信息: {str(e)}"
+ }
+
+ def judge_content(self, product_info, content, temperature=0.2, top_p=0.5, presence_penalty=0.0):
+ """审核内容"""
logging.info("开始内容审核流程")
+
# 构建用户提示词
user_prompt = self._build_user_prompt(product_info, content)
+ response_id = int(time.time())
try:
- # 调用AI模型进行内容审核
- logging.info("调用AI模型进行内容审核")
- start_time = time.time()
-
- # 使用AI_Agent的工作方法
+ # 调用AI模型
result, _, _ = self.ai_agent.work(
system_prompt=self._system_prompt,
user_prompt=user_prompt,
- file_folder=None, # 不使用文件夹
+ file_folder=None,
temperature=self._temperature,
top_p=self._topp,
presence_penalty=self._presence_penatly,
)
- end_time = time.time()
- logging.info(f"AI模型响应完成,耗时:{end_time - start_time:.2f}秒")
+ # 保存原始响应以便调试
+ self._save_response(result, response_id)
- # 保存原始响应用于调试
- response_log_dir = "/root/autodl-tmp/TravelContentCreator/log/judge_responses"
- os.makedirs(response_log_dir, exist_ok=True)
- response_log_file = f"{response_log_dir}/response_{int(time.time())}.txt"
- with open(response_log_file, "w", encoding="utf-8") as f:
- f.write(result)
- logging.info(f"原始响应已保存到: {response_log_file}")
+ # 使用简化的解析方法处理响应
+ content_json = self._split_content(result)
+
+ # 检查解析结果是否有错误
+ if content_json.get("error", False):
+ logging.warning(f"内容解析失败,使用原内容")
+ return self._create_fallback_result(content)
+
+ # 检查必要字段是否存在
+ if "title" not in content_json or "content" not in content_json:
+ logging.warning(f"解析结果缺少必要字段 'title' 或 'content'")
+ content_json["judge_success"] = False
+ return self._create_fallback_result(content)
+
+ # 添加Base64编码内容
+ result_dict = {
+ "judge_success": content_json.get("judge_success", True),
+ "judged": True,
+ "title": content_json["title"],
+ "content": content_json["content"],
+ "title_base64": base64.b64encode(content_json["title"].encode('utf-8')).decode('utf-8'),
+ "content_base64": base64.b64encode(content_json["content"].encode('utf-8')).decode('utf-8')
+ }
+
+ # 如果有analysis字段,也包含
+ if "analysis" in content_json:
+ result_dict["analysis"] = content_json["analysis"]
+ result_dict["analysis_base64"] = base64.b64encode(content_json["analysis"].encode('utf-8')).decode('utf-8')
+
+ return result_dict
- # 提取修改后的内容
- modified_content = self._extract_modified_content(result)
- if modified_content:
- logging.info("成功提取修改后的内容")
- # 添加judge_success字段
- modified_content["judge_success"] = True
-
- # 对内容进行最终清理,确保可以安全序列化为JSON
- modified_content = self._prepare_content_for_serialization(modified_content)
-
- # 记录处理后的内容用于调试
- debug_log_file = f"{response_log_dir}/processed_{int(time.time())}.json"
- try:
- serialized_content = json.dumps(modified_content, ensure_ascii=False, allow_nan=True, indent=2)
- with open(debug_log_file, "w", encoding="utf-8") as f:
- f.write(serialized_content)
- logging.info(f"处理后的内容已保存到: {debug_log_file}")
- except Exception as e:
- logging.error(f"尝试记录处理后内容时序列化失败: {e}")
- with open(debug_log_file, "w", encoding="utf-8") as f:
- f.write(f"序列化失败: {str(e)}\n\n")
- f.write(f"title: {modified_content.get('title', 'N/A')}\n")
- f.write(f"content前100字符: {str(modified_content.get('content', 'N/A'))[:100]}")
-
- # 验证序列化是否成功
- try:
- json.dumps(modified_content, ensure_ascii=False, allow_nan=True)
- logging.info("内容可以安全序列化为JSON")
- except Exception as e:
- logging.error(f"验证序列化时出错: {e}")
- # 找出导致错误的字段
- for key, value in modified_content.items():
- if isinstance(value, str):
- try:
- json.dumps(value, ensure_ascii=False)
- except Exception as sub_e:
- logging.error(f"字段 '{key}' 无法序列化: {sub_e}")
- # 尝试定位问题字符
- for i, char in enumerate(value):
- try:
- json.dumps(char, ensure_ascii=False)
- except:
- logging.error(f"位置 {i}, 字符 '{char}' (Unicode: U+{ord(char):04X}) 导致错误")
-
- modified_content["raw_result"] = str(e)
- modified_content["error"] = True
-
- return modified_content
- else:
- logging.error("无法从响应中提取有效内容")
- # 尝试使用原始内容并标记审核失败
- if isinstance(content, dict) and "title" in content and "content" in content:
- result_content = {
- "title": content.get("title", "提取失败"),
- "content": content.get("content", "无法从响应中提取有效内容"),
- "judge_success": False
- }
- # 确保可以序列化
- return self._prepare_content_for_serialization(result_content)
- result_content = {
- "title": "提取失败",
- "content": "无法从响应中提取有效内容",
- "judge_success": False
- }
- return self._prepare_content_for_serialization(result_content)
-
except Exception as e:
logging.exception(f"审核过程中出错: {e}")
- result_content = {
- "title": "审核失败",
- "content": f"审核过程中出错: {str(e)}",
- "judge_success": False
- }
- return self._prepare_content_for_serialization(result_content)
-
+ return self._create_fallback_result(content, error_msg=str(e))
+
+ def _save_response(self, response, response_id):
+ """保存原始响应"""
+ try:
+ response_log_dir = "/root/autodl-tmp/TravelContentCreator/log/judge_responses"
+ os.makedirs(response_log_dir, exist_ok=True)
+ with open(f"{response_log_dir}/response_{response_id}.txt", "w", encoding="utf-8") as f:
+ f.write(response)
+ except Exception as e:
+ logging.error(f"保存原始响应失败: {e}")
+
+ def _create_fallback_result(self, content, error_msg="解析失败"):
+ """创建回退结果"""
+ if isinstance(content, str):
+ # 尝试解析内容字符串看是否是JSON字符串
+ try:
+ content_obj = json.loads(content)
+ title = content_obj.get("title", "")
+ content_text = content_obj.get("content", "")
+ except:
+ # 不是JSON字符串,视为纯文本内容
+ title = "审核失败"
+ content_text = content
+ elif isinstance(content, dict):
+ # 已经是字典对象
+ title = content.get("title", "")
+ content_text = content.get("content", "")
+ else:
+ # 其他类型,创建空内容
+ title = "审核失败"
+ content_text = f"无法解析内容: {error_msg}"
+
+ return {
+ "judge_success": False,
+ "judged": True,
+ "title": title,
+ "content": content_text,
+ "title_base64": base64.b64encode(title.encode('utf-8')).decode('utf-8'),
+ "content_base64": base64.b64encode(content_text.encode('utf-8')).decode('utf-8'),
+ "analysis": f"内容审核失败: {error_msg}",
+ "analysis_base64": base64.b64encode(f"内容审核失败: {error_msg}".encode('utf-8')).decode('utf-8')
+ }
+
def _build_user_prompt(self, product_info, content_gen):
"""
构建用户提示词
@@ -293,367 +316,16 @@ class ContentJudger:
Returns:
str: 构建好的用户提示词
"""
+ # 确保content_gen为字符串格式
+ if isinstance(content_gen, dict):
+ content_str = f"title: {content_gen.get('title', '')}\n\ncontent: {content_gen.get('content', '')}"
+ else:
+ content_str = str(content_gen)
+
return f"""
## 产品资料(真实信息,作为判断依据):
{product_info}
## 运营生成的文案(需要审核的内容):
-{content_gen}
-"""
-
- def _extract_modified_content(self, result_text):
- """从检测结果文本中提取修改后的文案内容"""
- try:
- processed_text = result_text # Work on a copy of the input text
- # 记录原始文本前100个字符用于调试
- logging.debug(f"原始响应文本前100字符: {result_text[:100]}")
-
- # 尝试方法1: 使用标签分离内容
- if "" in processed_text:
- processed_text = processed_text.split("", 1)[1].strip()
- logging.debug("检测到标签并分离内容")
-
- # 尝试方法2: 预处理文本并尝试解析JSON
- try:
- # 彻底清理文本,去除所有可能影响JSON解析的控制字符
- cleaned_text = self._sanitize_json_text(processed_text)
- logging.debug(f"清理后文本前100字符: {cleaned_text[:100]}")
-
- content_json = json.loads(cleaned_text)
- if "title" in content_json and "content" in content_json:
- logging.info("成功通过JSON解析提取内容")
- title = content_json.get("title", "").strip()
- content = content_json.get("content", "").strip()
- analysis = content_json.get("analysis", "")
- logging.debug(f"提取到标题: {title[:30]}...")
- return {
- "title": title,
- "content": content,
- "analysis": analysis
- }
- except json.JSONDecodeError as e:
- logging.warning(f"JSON解析失败: {e},将尝试其他提取方法")
- # 记录更多错误信息以便调试
- error_position = e.pos
- error_context = cleaned_text[max(0, error_position-30):min(len(cleaned_text), error_position+30)]
- logging.debug(f"错误位置附近的文本: {error_context}")
- logging.debug(f"错误行列: 行 {e.lineno}, 列 {e.colno}")
-
- # 尝试方法3: 从文本中提取JSON格式部分
- json_start = processed_text.find('{')
- json_end = processed_text.rfind('}') + 1
- if json_start >= 0 and json_end > json_start:
- json_str = processed_text[json_start:json_end]
- logging.debug(f"找到JSON字符串,长度: {len(json_str)},前100字符: {json_str[:100]}")
-
- # 清理可能破坏JSON解析的控制字符
- json_str_cleaned = self._sanitize_json_text(json_str)
- try:
- content_json = json.loads(json_str_cleaned)
- if "title" in content_json and "content" in content_json:
- logging.info("成功从文本中提取JSON部分并解析")
- return {
- "title": content_json.get("title", "").strip(),
- "content": content_json.get("content", "").strip(),
- "analysis": content_json.get("analysis", "")
- }
- except json.JSONDecodeError as e:
- logging.warning(f"JSON子串解析失败: {e},将尝试正则表达式提取")
- # 保存导致错误的JSON字符串到文件
- self._save_problematic_json(json_str_cleaned, e)
-
- # 尝试方法4: 手动解析JSON格式的关键字段
- try:
- logging.debug("尝试手动解析JSON结构")
- manual_result = self._manual_json_extract(processed_text)
- if manual_result and "title" in manual_result and "content" in manual_result:
- logging.info("成功通过手动解析JSON提取内容")
- return manual_result
- except Exception as e:
- logging.warning(f"手动解析JSON失败: {e}")
-
- # 尝试方法5: 使用正则表达式提取
- logging.debug("尝试使用正则表达式提取")
- # 更强大的正则表达式,处理多行内容
- title_match = re.search(r'"title"\s*:\s*"((?:[^"\\]|\\.|[\r\n])+)"', processed_text, re.DOTALL)
- content_match = re.search(r'"content"\s*:\s*"((?:[^"\\]|\\.|[\r\n])+)"', processed_text, re.DOTALL)
- analysis_match = re.search(r'"analysis"\s*:\s*"((?:[^"\\]|\\.|[\r\n])+)"', processed_text, re.DOTALL)
-
- if title_match and content_match:
- logging.info("成功使用正则表达式提取标题和内容")
- return {
- "title": title_match.group(1).replace('\\"', '"').strip(),
- "content": content_match.group(1).replace('\\"', '"').strip(),
- "analysis": analysis_match.group(1).replace('\\"', '"').strip() if analysis_match else ""
- }
-
- # 尝试方法6: 查找使用单引号的内容
- logging.debug("尝试查找使用单引号的内容")
- title_match = re.search(r'"title"\s*:\s*\'((?:[^\'\\]|\\.|[\r\n])+)\'', processed_text, re.DOTALL)
- content_match = re.search(r'"content"\s*:\s*\'((?:[^\'\\]|\\.|[\r\n])+)\'', processed_text, re.DOTALL)
- analysis_match = re.search(r'"analysis"\s*:\s*\'((?:[^\'\\]|\\.|[\r\n])+)\'', processed_text, re.DOTALL)
-
- if title_match and content_match:
- logging.info("成功使用单引号正则表达式提取内容")
- return {
- "title": title_match.group(1).strip(),
- "content": content_match.group(1).strip(),
- "analysis": analysis_match.group(1).strip() if analysis_match else ""
- }
-
- # 尝试方法7: 使用非标准格式提取
- logging.debug("尝试非标准格式提取")
- title_pattern = re.compile(r'["""]?title["""]?[::]\s*["""]([^"""]+)["""]', re.IGNORECASE | re.DOTALL)
- content_pattern = re.compile(r'["""]?content["""]?[::]\s*["""]([^"""]+)["""]', re.IGNORECASE | re.DOTALL)
- analysis_pattern = re.compile(r'["""]?analysis["""]?[::]\s*["""]([^"""]+)["""]', re.IGNORECASE | re.DOTALL)
-
- title_match = title_pattern.search(processed_text)
- content_match = content_pattern.search(processed_text)
- analysis_match = analysis_pattern.search(processed_text)
-
- if title_match and content_match:
- logging.info("成功使用灵活模式匹配提取内容")
- return {
- "title": title_match.group(1).strip(),
- "content": content_match.group(1).strip(),
- "analysis": analysis_match.group(1).strip() if analysis_match else ""
- }
-
- logging.warning(f"所有提取方法失败,响应前300字符: {processed_text[:300]}...")
- return None # 所有方法失败时的回退选项
-
- except Exception as e:
- logging.error(f"内容提取过程中发生意外错误: {e}\n{traceback.format_exc()}")
- return None
-
- def _sanitize_json_text(self, text):
- """彻底清理文本,确保可以安全解析为JSON,同时保留换行符"""
- # 步骤1: 处理控制字符,但保留换行符、回车和制表符
- cleaned = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', text)
-
- # 不再将实际换行符转换为\n字符串,保留原始换行符
- # cleaned = cleaned.replace('\n', '\\n').replace('\r', '\\r')
-
- # 步骤3: 处理内容字段中开始或结束可能存在的多余空格或引号
- cleaned = re.sub(r'"content"\s*:\s*"\s*', '"content":"', cleaned)
- cleaned = re.sub(r'"\s*,', '",', cleaned)
-
- # 步骤4: 处理未转义的引号和反斜杠
- cleaned = re.sub(r'(?= 0:
- colon_pos = text.find(':', title_start)
- if colon_pos > 0:
- quote_pos = text.find('"', colon_pos)
- if quote_pos > 0:
- end_quote_pos = text.find('"', quote_pos + 1)
- while end_quote_pos > 0 and text[end_quote_pos-1] == '\\':
- end_quote_pos = text.find('"', end_quote_pos + 1)
- if end_quote_pos > 0:
- result['title'] = text[quote_pos+1:end_quote_pos].replace('\\"', '"').strip()
-
- # 查找content字段
- content_start = text.find('"content"')
- if content_start >= 0:
- colon_pos = text.find(':', content_start)
- if colon_pos > 0:
- quote_pos = text.find('"', colon_pos)
- if quote_pos > 0:
- # 查找非转义双引号
- pos = quote_pos + 1
- content_end = -1
- while pos < len(text):
- if text[pos] == '"' and (pos == 0 or text[pos-1] != '\\'):
- content_end = pos
- break
- pos += 1
-
- if content_end > 0:
- content = text[quote_pos+1:content_end].replace('\\"', '"')
- # 处理反斜杠转义的换行符,如果字符串中有'\n',将其转换为实际换行符
- # 但如果已经是实际的换行符,则保留
- if '\\n' in content:
- content = content.replace('\\n', '\n')
- if '\\r' in content:
- content = content.replace('\\r', '\r')
- result['content'] = content.strip()
-
- # 查找analysis字段
- analysis_start = text.find('"analysis"')
- if analysis_start >= 0:
- colon_pos = text.find(':', analysis_start)
- if colon_pos > 0:
- quote_pos = text.find('"', colon_pos)
- if quote_pos > 0:
- pos = quote_pos + 1
- analysis_end = -1
- while pos < len(text):
- if text[pos] == '"' and (pos == 0 or text[pos-1] != '\\'):
- analysis_end = pos
- break
- pos += 1
-
- if analysis_end > 0:
- analysis = text[quote_pos+1:analysis_end].replace('\\"', '"')
- # 处理反斜杠转义的换行符
- if '\\n' in analysis:
- analysis = analysis.replace('\\n', '\n')
- if '\\r' in analysis:
- analysis = analysis.replace('\\r', '\r')
- result['analysis'] = analysis.strip()
-
- return result if 'title' in result and 'content' in result else None
- except Exception as e:
- logging.error(f"手动解析过程中出错: {e}")
- return None
-
- def _save_problematic_json(self, json_text, error):
- """保存导致解析错误的JSON字符串,用于调试"""
- try:
- error_log_dir = "/root/autodl-tmp/TravelContentCreator/log/json_errors"
- os.makedirs(error_log_dir, exist_ok=True)
- error_log_file = f"{error_log_dir}/error_{int(time.time())}.json"
-
- with open(error_log_file, "w", encoding="utf-8") as f:
- f.write(f"# 错误信息: {str(error)}\n")
- f.write(f"# 错误位置: 行 {error.lineno}, 列 {error.colno}\n")
- f.write(json_text)
-
- logging.info(f"已保存问题JSON到: {error_log_file}")
- except Exception as e:
- logging.error(f"保存问题JSON时出错: {e}")
-
- def test_extraction_from_file(self, response_file_path):
- """
- 从文件中读取响应并测试提取功能
-
- Args:
- response_file_path: 响应文件路径
-
- Returns:
- dict: 提取结果
- """
- try:
- logging.info(f"从文件测试提取: {response_file_path}")
- with open(response_file_path, 'r', encoding='utf-8') as f:
- response_text = f.read()
-
- result = self._extract_modified_content(response_text)
- if result:
- logging.info(f"成功从文件提取内容: {result.get('title', '')[:30]}...")
- return {"success": True, "result": result}
- else:
- logging.error(f"从文件中提取内容失败")
- return {"success": False, "error": "提取失败"}
-
- except Exception as e:
- logging.exception(f"测试提取时发生错误: {e}")
- return {"success": False, "error": str(e)}
-
- def _prepare_content_for_serialization(self, content_dict):
- """
- 对内容进行处理,确保可以安全序列化为JSON,同时保留emoji字符和换行符
-
- Args:
- content_dict: 内容字典
-
- Returns:
- dict: 处理后的内容字典
- """
- try:
- # 创建一个新字典,避免修改原始内容
- safe_dict = {}
-
- for key, value in content_dict.items():
- # 处理字符串类型的值
- if isinstance(value, str):
- # 第一步:清理控制字符,但保留换行符、回车和制表符
- safe_value = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', value)
-
- # 确保文本中的反斜杠换行符(如\\n)被转换为实际换行符
- if '\\n' in safe_value:
- safe_value = safe_value.replace('\\n', '\n')
- if '\\r' in safe_value:
- safe_value = safe_value.replace('\\r', '\r')
-
- # 第二步:将emoji字符和其他非ASCII字符转换为相应的Unicode转义序列
- char_list = []
- for char in safe_value:
- # 保留常见的控制字符(换行符、回车、制表符)
- if char in '\n\r\t':
- char_list.append(char)
- elif ord(char) > 127: # 非ASCII字符
- # 尝试保留高位字符(包括emoji)
- try:
- # 验证这个字符是否可以安全序列化
- json.dumps(char, ensure_ascii=False)
- char_list.append(char)
- except:
- # 如果这个字符无法序列化,使用其Unicode码点的字符串表示
- char_list.append(f"\\u{ord(char):04x}")
- else:
- char_list.append(char)
-
- processed_value = ''.join(char_list)
-
- # 最终验证这个值是否可以安全序列化
- try:
- json.dumps(processed_value, ensure_ascii=False)
- safe_dict[key] = processed_value
- except Exception as e:
- logging.warning(f"处理后的'{key}'值仍无法序列化: {e},将进行更严格处理")
- # 更严格的处理:保留ASCII字符和基本控制字符
- safe_value = ''
- for c in processed_value:
- if c in '\n\r\t' or (32 <= ord(c) < 127):
- safe_value += c
- safe_dict[key] = safe_value
- else:
- safe_dict[key] = value
-
- # 最终验证整个字典是否可序列化
- try:
- # 使用ensure_ascii=False允许非ASCII字符直接出现在JSON中
- # 使用allow_nan=True允许特殊浮点数值
- json_str = json.dumps(safe_dict, ensure_ascii=False, allow_nan=True)
- # 验证生成的JSON是否有效
- json.loads(json_str)
- except Exception as e:
- logging.error(f"最终字典序列化验证失败: {e}")
- # 如果依然失败,返回一个绝对安全的结果,但保留换行符
- safe_content = ''
- original_content = content_dict.get("content", "内容包含无法安全序列化的字符")
- for c in original_content:
- if c in '\n\r\t' or (32 <= ord(c) < 127):
- safe_content += c
-
- return {
- "title": re.sub(r'[^\x20-\x7E]', '', content_dict.get("title", "序列化处理失败")),
- "content": safe_content,
- "judge_success": content_dict.get("judge_success", False),
- "error": True,
- "raw_result": str(e)
- }
-
- return safe_dict
- except Exception as e:
- logging.error(f"序列化准备过程中发生意外错误: {e}")
- return {
- "title": "序列化处理失败",
- "content": "处理内容时发生意外错误",
- "judge_success": False,
- "error": True,
- "raw_result": str(e)
- }
\ No newline at end of file
+{content_str}
+"""
\ No newline at end of file
diff --git a/utils/output_handler.py b/utils/output_handler.py
index 18353f6..b6d9b14 100644
--- a/utils/output_handler.py
+++ b/utils/output_handler.py
@@ -281,17 +281,65 @@ class FileSystemOutputHandler(OutputHandler):
return ""
return ''.join(c for c in text if 32 <= ord(c) <= 126)
+ def _preprocess_for_json(self, text):
+ """预处理文本,将换行符转换为\\n形式,保证JSON安全"""
+ if not isinstance(text, str):
+ return text
+ # 将所有实际换行符替换为\\n字符串
+ return text.replace('\n', '\\n').replace('\r', '\\r')
+
def handle_poster_configs(self, run_id: str, topic_index: int, config_data: list | dict):
- """Saves the complete poster configuration list/dict for a topic."""
- run_dir = self._get_run_dir(run_id)
- config_path = os.path.join(run_dir, f"topic_{topic_index}_poster_configs.json")
+ """处理海报配置数据"""
+ # 处理海报配置数据
try:
- with open(config_path, 'w', encoding='utf-8') as f_cfg_topic:
- # 不使用自定义编码器,使用标准json
- json.dump(config_data, f_cfg_topic, ensure_ascii=False, indent=4, ignore_nan=True)
- logging.info(f"Saved complete poster configurations for topic {topic_index} to: {config_path}")
- except Exception as save_err:
- logging.error(f"Failed to save complete poster configurations for topic {topic_index} to {config_path}: {save_err}")
+ # 创建目标目录
+ variant_dir = os.path.join(self._get_run_dir(run_id), f"{topic_index}_1")
+ os.makedirs(variant_dir, exist_ok=True)
+
+ # 确保配置数据是可序列化的
+ processed_configs = []
+ if isinstance(config_data, list):
+ for config in config_data:
+ processed_config = {}
+ # 处理索引字段
+ processed_config["index"] = config.get("index", 0)
+
+ # 处理标题字段,应用JSON预处理
+ main_title = config.get("main_title", "")
+ processed_config["main_title"] = self._preprocess_for_json(main_title)
+
+ # 处理文本字段列表,对每个文本应用JSON预处理
+ texts = config.get("texts", [])
+ processed_texts = []
+ for text in texts:
+ processed_texts.append(self._preprocess_for_json(text))
+ processed_config["texts"] = processed_texts
+
+ processed_configs.append(processed_config)
+ else:
+ # 如果不是列表,可能是字典或其他格式,尝试转换
+ if isinstance(config_data, dict):
+ # 处理单个配置字典
+ processed_config = {}
+ processed_config["index"] = config_data.get("index", 0)
+ processed_config["main_title"] = self._preprocess_for_json(config_data.get("main_title", ""))
+
+ texts = config_data.get("texts", [])
+ processed_texts = []
+ for text in texts:
+ processed_texts.append(self._preprocess_for_json(text))
+ processed_config["texts"] = processed_texts
+
+ processed_configs.append(processed_config)
+
+ # 保存配置到JSON文件
+ config_file_path = os.path.join(variant_dir, f"topic_{topic_index}_poster_configs.json")
+ with open(config_file_path, 'w', encoding='utf-8') as f:
+ json.dump(processed_configs, f, ensure_ascii=False, indent=4, cls=self.SafeJSONEncoder)
+ logging.info(f"Successfully saved poster configs to {config_file_path}")
+ except Exception as e:
+ logging.error(f"Error saving poster configs: {e}")
+ traceback.print_exc()
def handle_generated_image(self, run_id: str, topic_index: int, variant_index: int, image_type: str, image_data, output_filename: str, metadata: dict = None):
"""处理生成的图像,对于笔记图像和额外配图保存到image目录,其他类型保持原有路径结构"""
diff --git a/utils/tweet_generator.py b/utils/tweet_generator.py
index 7bf8725..23b90c8 100644
--- a/utils/tweet_generator.py
+++ b/utils/tweet_generator.py
@@ -620,15 +620,47 @@ def generate_posters_for_topic(topic_item: dict,
if os.path.exists(content_path):
with open(content_path, 'r', encoding='utf-8') as f_content:
content_data = json.load(f_content)
+
+ # 支持Base64编码格式的文件
+ if 'title_base64' in content_data and 'content_base64' in content_data:
+ import base64
+ logging.info(f"检测到Base64编码的内容文件: {content_path}")
+
+ # 解码Base64内容
+ try:
+ title = base64.b64decode(content_data.get('title_base64', '')).decode('utf-8')
+ content = base64.b64decode(content_data.get('content_base64', '')).decode('utf-8')
+
+ # 创建包含解码内容的新数据对象
+ decoded_data = {
+ 'title': title,
+ 'content': content,
+ 'judge_success': content_data.get('judge_success', True),
+ 'judged': content_data.get('judged', True)
+ }
+
+ # 如果有标签,也解码
+ if 'tags_base64' in content_data:
+ tags = base64.b64decode(content_data.get('tags_base64', '')).decode('utf-8')
+ decoded_data['tags'] = tags
+
+ loaded_content_list.append(decoded_data)
+ logging.debug(f" 已成功解码并加载Base64内容: {content_path}")
+ continue
+ except Exception as decode_error:
+ logging.error(f" 解码Base64内容时出错: {decode_error},跳过此文件")
+ continue
+
+ # 常规JSON格式检查
if isinstance(content_data, dict) and 'title' in content_data and 'content' in content_data:
- loaded_content_list.append(content_data)
- logging.debug(f" Successfully loaded content from: {content_path}")
+ loaded_content_list.append(content_data)
+ logging.debug(f" Successfully loaded content from: {content_path}")
else:
- logging.warning(f" Content file {content_path} has invalid format. Skipping.")
+ logging.warning(f" Content file {content_path} has invalid format. Skipping.")
else:
logging.warning(f" Content file not found for variant {variant_index}: {content_path}. Skipping.")
except json.JSONDecodeError:
- logging.error(f" Error decoding JSON from content file: {content_path}. Skipping.")
+ logging.error(f" Error decoding JSON from content file: {content_path}. Skipping.")
except Exception as e:
logging.exception(f" Error loading content file {content_path}: {e}")