基于tweet_generator的结果修改了content_judger的结果提取

This commit is contained in:
jinye_huang 2025-05-19 20:52:31 +08:00
parent fe1cbae9c8
commit 44c79ec8e5
8 changed files with 354 additions and 657 deletions

View File

@ -193,108 +193,66 @@ class ContentGenerator:
self.logger.debug(f"原始内容: {content[:200]}...") # 仅显示前200个字符
return content.strip() # 返回原始内容,让后续验证函数处理
def generate_posters(self,
poster_num,
content_data_list,
system_prompt=None,
api_url=None,
model_name=None,
api_key=None,
timeout=60,
max_retries=3):
def _preprocess_for_json(self, text):
"""预处理文本,将换行符转换为\\n形式保证JSON安全"""
if not isinstance(text, str):
return text
# 将所有实际换行符替换为\\n字符串
return text.replace('\n', '\\n').replace('\r', '\\r')
def generate_posters(self, poster_num, content_data_list, system_prompt=None,
api_url=None, model_name=None, api_key=None, timeout=120, max_retries=3):
"""
生成海报内容
参数:
poster_num: 海报数量
content_data_list: 内容数据列表字典或字符串
system_prompt: 系统提示默认为None则使用预设提示
api_url: API基础URL
model_name: 使用的模型名称
api_key: API密钥
timeout: 请求超时时间
max_retries: 最大重试次数
生成海报配置
返回:
生成的海报内容
Args:
poster_num: 生成的海报数量
content_data_list: 内容数据列表
system_prompt: 系统提示词可选
api_url: API基础URL可选
model_name: 模型名称可选
api_key: API密钥可选
Returns:
str: 生成的配置JSON字符串
"""
# 构建默认系统提示词
if not system_prompt:
system_prompt = """
你是一名资深海报设计师有丰富的爆款海报设计经验你现在要为旅游景点做宣传在小红书上发布大量宣传海报你的主要工作目标有2个
1你要根据我给你的图片描述和笔记推文内容设计图文匹配的海报
2为海报设计文案文案的<第一个小标题><第二个小标题>之间你需要检查是否逻辑关系合理你将通过先去生成<第二个小标题>关于景区亮点的部分再去综合判断<第一个小标题>应该如何搭配组合更符合两个小标题的逻辑再生成<第一个小标题>
其中生成三类标题文案的通用性要求如下
1生成的<大标题>字数必须小于8个字符
2生成的<第一个小标题>字数和<第二个小标题>字数两者都必须小8个字符
3标题和文案都应符合中国社会主义核心价值观
接下来先开始生成<大标题>部分由于海报是用来宣传旅游景点生成的海报<大标题>必须使用以下8种格式之一
地名景点名例如福建厦门鼓浪屿/厦门鼓浪屿
地名+景点名+plog
拿捏+地名+景点名
地名+景点名+攻略
速通+地名+景点名
推荐+地名+景点名
勇闯+地名+景点名
收藏+地名+景点名
你需要随机挑选一种格式生成对应景点的文案但是格式除了上面8种不可以有其他任何格式同时尽量保证每一种格式出现的频率均衡
接下来先去生成<第二个小标题><第二个小标题>文案的创作必须遵循以下原则
请根据笔记内容和图片识别用极简的文字概括这篇笔记和图片中景点的特色亮点其中你可以参考以下词汇进行创作这段文案字数控制6-8字符以内
特色亮点可能会出现的词汇不完全举例非遗古建绝佳山水祈福圣地研学圣地解压天堂中国小瑞士秘境竹筏游等等类型词汇
接下来再去生成<第一个小标题><第一个小标题>文案的创作必须遵循以下原则
这部分文案创作公式有5种分别为
<受众人群画像>+<痛点词>
<受众人群画像>
<痛点词>
<受众人群画像>+ | +<痛点词>
<痛点词>+ | +<受众人群画像>
请你根据实际笔记内容结合这部分文案创作公式需要结合<受众人群画像><痛点词>必须根据<第二个小标题>的景点特征和所对应的完整笔记推文内容主旨特征挑选对应<受众人群画像><痛点词>
我给你提供受众人群画像库和痛点词库如下
1受众人群画像库情侣党亲子游合家游银发族亲子研学学生党打工人周边游本地人穷游党性价比户外人美食党出片
2痛点词库3天2夜必去看了都哭了不能错过一定要来问爆了超全攻略必打卡强推懒人攻略必游榜小众打卡狂喜等等
你需要为每个请求至少生成{poster_num}个海报设计请使用JSON格式输出结果结构如下
[
{
"index": 1,
"main_title": "主标题内容",
"texts": ["第一个小标题", "第二个小标题"]
},
{
"index": 2,
"main_title": "主标题内容",
"texts": ["第一个小标题", "第二个小标题"]
},
// ... 更多海报
]
确保生成的数量与用户要求的数量一致只生成上述JSON格式内容不要有其他任何额外内容
"""
# 更新API设置
if api_url:
self.base_url = api_url
if model_name:
self.model_name = model_name
if api_key:
self.api_key = api_key
# 使用系统提示或默认提示
if system_prompt:
self.system_prompt = system_prompt
elif not self.system_prompt:
self.system_prompt = """你是一名专业的旅游景点海报文案创作专家。你的任务是根据提供的旅游景点信息和推文内容生成海报文案配置。你的回复必须是一个JSON数组每一项表示一个海报配置包含'index''main_title''texts'三个字段,其中'texts'是一个字符串数组。海报文案要简洁有力,突出景点特色和吸引力。"""
# 提取内容文本(如果是列表内容数据)
tweet_content = ""
if isinstance(content_data_list, list):
for item in content_data_list:
if isinstance(item, dict):
title = item.get('title', '')
content = item.get('content', '')
# 对标题和内容进行预处理,替换换行符
title = self._preprocess_for_json(item.get('title', ''))
content = self._preprocess_for_json(item.get('content', ''))
tweet_content += f"<title>\n{title}\n</title>\n<content>\n{content}\n</content>\n\n"
elif isinstance(item, str):
tweet_content += item + "\n\n"
tweet_content += self._preprocess_for_json(item) + "\n\n"
elif isinstance(content_data_list, str):
tweet_content = content_data_list
tweet_content = self._preprocess_for_json(content_data_list)
# 构建用户提示
if self.add_description:
# 预处理景点描述
processed_description = self._preprocess_for_json(self.add_description)
user_content = f"""
以下是需要你处理的信息
关于景点的描述:
{self.add_description}
{processed_description}
推文内容:
{tweet_content}
@ -326,7 +284,7 @@ class ContentGenerator:
# 使用AI_Agent的non-streaming方法
self.logger.info(f"调用AI生成海报配置模型: {self.model_name}")
full_response, tokens, time_cost = ai_agent.work(
system_prompt,
self.system_prompt,
user_content,
"", # 历史消息(空)
self.temperature,
@ -409,118 +367,105 @@ class ContentGenerator:
def _validate_and_fix_data(self, data):
"""
验证并修复数据格式确保符合期结构
参数:
验证并修复从AI返回的数据确保符合期望的结构
Args:
data: 需要验证的数据
返回:
修复后的数据
Returns:
list: 修复后的数据列表
"""
fixed_data = []
self.logger.info(f"验证并修复数据: {type(data)}")
# 记录原始数据格式信息
self.logger.info(f"验证和修复数据,原始数据类型: {type(data)}")
if isinstance(data, list):
self.logger.info(f"原始数据是列表,长度: {len(data)}")
if len(data) > 0:
self.logger.info(f"第一个元素类型: {type(data[0])}")
elif isinstance(data, str):
self.logger.info(f"原始数据是字符串: {data[:100]}")
else:
self.logger.info(f"原始数据是其他类型: {data}")
# 尝试处理字符串类型 (通常是JSON字符串)
if isinstance(data, str):
try:
# 尝试将字符串解析为JSON对象
parsed_data = json.loads(data)
# 递归调用本函数处理解析后的数据
return self._validate_and_fix_data(parsed_data)
except json.JSONDecodeError as e:
self.logger.warning(f"JSON解析失败: {e}")
# 可以选择尝试清理和再次解析
try:
# 寻找字符串中第一个 [ 和最后一个 ] 之间的内容
start_idx = data.find('[')
end_idx = data.rfind(']')
if start_idx >= 0 and end_idx > start_idx:
json_part = data[start_idx:end_idx+1]
self.logger.info(f"尝试从字符串中提取JSON部分: {json_part[:100]}...")
parsed_data = json.loads(json_part)
return self._validate_and_fix_data(parsed_data)
except:
self.logger.warning("无法从字符串中提取有效的JSON部分")
fixed_data.append({
"index": 1,
"main_title": self._preprocess_for_json("默认标题"), # 应用预处理
"texts": [self._preprocess_for_json("默认副标题1"), self._preprocess_for_json("默认副标题2")] # 应用预处理
})
# 如果数据是列表
if isinstance(data, list):
for i, item in enumerate(data):
# 检查项目是否为字典
# 处理列表类型
elif isinstance(data, list):
for idx, item in enumerate(data):
# 如果是字典,检查必须字段
if isinstance(item, dict):
# 确保必需字段存在
fixed_item = {
"index": item.get("index", i + 1),
"main_title": item.get("main_title", ""),
"texts": item.get("texts", ["", ""])
}
fixed_item = {}
# 设置索引
fixed_item["index"] = item.get("index", idx + 1)
# 确保texts是列表格式
if not isinstance(fixed_item["texts"], list):
if isinstance(fixed_item["texts"], str):
fixed_item["texts"] = [fixed_item["texts"], ""]
else:
fixed_item["texts"] = ["", ""]
# 限制texts最多包含两个元素
if len(fixed_item["texts"]) > 2:
fixed_item["texts"] = fixed_item["texts"][:2]
elif len(fixed_item["texts"]) < 2:
# 处理主标题
if "main_title" in item and item["main_title"]:
# 应用预处理,确保所有换行符被正确转义
fixed_item["main_title"] = self._preprocess_for_json(item["main_title"])
else:
fixed_item["main_title"] = "默认标题"
# 处理文本列表
if "texts" in item and isinstance(item["texts"], list) and len(item["texts"]) > 0:
# 对文本列表中的每个元素应用预处理
fixed_item["texts"] = [self._preprocess_for_json(text) if text else "" for text in item["texts"]]
# 确保至少有两个元素
while len(fixed_item["texts"]) < 2:
fixed_item["texts"].append("")
fixed_data.append(fixed_item)
# 如果项目是字符串可能是错误格式的texts值
elif isinstance(item, str):
self.logger.warning(f"配置项 {i+1} 是字符串格式: '{item}',将转换为标准格式")
# 尝试解析字符串格式,例如"性价比|必打卡"
texts = []
if "|" in item:
texts = item.split("|")
else:
texts = [item, ""]
fixed_item["texts"] = ["默认副标题1", "默认副标题2"]
fixed_item = {
"index": i + 1,
"main_title": "",
"texts": texts
}
fixed_data.append(fixed_item)
else:
self.logger.warning(f"配置项 {i+1} 格式不支持: {type(item)},将使用默认值")
# 如果是字符串,转换为默认格式
elif isinstance(item, str):
fixed_data.append({
"index": i + 1,
"main_title": "",
"index": idx + 1,
"main_title": self._preprocess_for_json(item), # 应用预处理
"texts": ["", ""]
})
# 其他类型,使用默认值
else:
fixed_data.append({
"index": idx + 1,
"main_title": "默认标题",
"texts": ["", ""]
})
# 如果数据是字典
# 处理字典类型 (单个配置项)
elif isinstance(data, dict):
fixed_item = {
"index": data.get("index", 1),
"main_title": data.get("main_title", ""),
"texts": data.get("texts", ["", ""])
}
# 处理主标题
main_title = self._preprocess_for_json(data.get("main_title", "默认标题")) # 应用预处理
# 确保texts是列表格式
if not isinstance(fixed_item["texts"], list):
if isinstance(fixed_item["texts"], str):
fixed_item["texts"] = [fixed_item["texts"], ""]
else:
fixed_item["texts"] = ["", ""]
# 限制texts最多包含两个元素
if len(fixed_item["texts"]) > 2:
fixed_item["texts"] = fixed_item["texts"][:2]
elif len(fixed_item["texts"]) < 2:
while len(fixed_item["texts"]) < 2:
fixed_item["texts"].append("")
fixed_data.append(fixed_item)
# 如果数据是字符串
elif isinstance(data, str):
self.logger.warning(f"数据是字符串格式: '{data}',尝试转换为标准格式")
# 尝试解析字符串格式,例如"性价比|必打卡"
# 处理文本列表
texts = []
if "|" in data:
texts = data.split("|")
else:
texts = [data, ""]
if "texts" in data and isinstance(data["texts"], list):
texts = [self._preprocess_for_json(text) if text else "" for text in data["texts"]] # 应用预处理
# 确保文本列表至少有两个元素
while len(texts) < 2:
texts.append("")
fixed_data.append({
"index": 1,
"main_title": "",
"index": data.get("index", 1),
"main_title": main_title,
"texts": texts
})

View File

@ -4,13 +4,14 @@
内容审核模块检查生成的内容是否符合产品资料要求并提供修改建议
"""
import simplejson as json
import json
import logging
import re
import os
import time
import traceback
import sys
import base64
import re
sys.path.append('/root/autodl-tmp/TravelContentCreator') # 添加项目根目录
from core.ai_agent import AI_Agent
@ -107,18 +108,8 @@ class ContentJudger:
}
输出结果:
{ "analysis" : "
1观察文案标题和内容可以看出此文案主要面向亲子出游人群因此修改后的文案也应该围绕亲子出游这一主题
2文章标题字数为28个字超过19个字因此属于不符内容由于要求中提到尽量保留emoji并且标题中数字后面的""字应删去所以修改为五一遛娃👶必囤喜来登1088景观房
3产品资料中未提及儿童乐园开放时间和儿童乐园配置但文案中提到儿童乐园10:00-20:00全程开放滑梯/积木/绘本一应俱全因此属于不符内容应修改为儿童乐园免费儿童乐园和丰富的游乐设施让孩子们可以尽情玩耍
4产品材料中未提及户外泳池开放时间和消毒频次但文案中提到户外泳池9:00-18:00恒温开放五一期间每日消毒3次因此属于不符内容应修改为户外泳池酒店配有户外无边泳池供大人小孩一同享受清凉时光
5产品材料中未提及健身房开放时间与具体细节但文案中提到健身房8:00-22:00配备亲子瑜伽课程需提前预约因此属于不符内容应修改为健身房酒店提供免费健身中心方便您和家人一起强身健体
6产品材料中未提及餐厅硬件配置但文案中提到自助晚餐隐藏彩蛋儿童餐区设独立洗手台+热食保温柜因此属于虚构内容应修改为自助餐厅供应鲜美海鲜精美甜品等任君选择大人小孩都爱吃
7产品材料中未提及酒店安保措施但文案中提到安全保障全区域监控+24小时安保巡逻因此属于不符内容应修改为安全保障酒店设有完善的监控系统和安保措施无需担心您与家人的安全
8产品材料中未提及房内配有加厚床垫/卡通洗漱杯/尿布台无需额外购买因此属于不符内容应回顾产品资料中关于房内配置的内容修改为房内配置55英寸超大纯平电视+独立的浴缸+超大的落地玻璃窗尽览蕉门河风景尽享亲子度假时光
9产品材料中未提及五一专属加码但文案中提到5月1-5日期间入住凭房卡可免费领取儿童防晒冰袖+湿巾礼包因此属于不符内容应回顾产品资料找到现有文案未提及的产品特色修改为套餐专属福利1豪华客房一间一晚(周一至四只开放双床房) 22大1小自助早晚餐 3赠送2大1小水鸟世界门票酒店前台领取无需额外购买
10产品资料中未提及水鸟世界门票领取有时间限制但文案中提到水鸟世界门票需提前1小时至前台领取纸质票因此属于不符内容应修改为酒店前台领取水鸟世界纸质门票
综合以上分析结果将修改应用到原文案中得到修改后的文案",
{
"analysis" : "1、观察文案标题和内容可以看出此文案主要面向亲子出游人群因此修改后的文案也应该围绕亲子出游这一主题。\n2、文章标题字数为28个字超过19个字因此属于不符内容。由于要求中提到尽量保留emoji并且标题中数字后面的""字应删去所以修改为五一遛娃👶必囤喜来登1088景观房\n3、产品资料中未提及儿童乐园开放时间和儿童乐园配置但文案中提到儿童乐园10:00-20:00全程开放滑梯/积木/绘本一应俱全,因此属于不符内容。应修改为:儿童乐园:免费儿童乐园和丰富的游乐设施,让孩子们可以尽情玩耍。\n4、产品材料中未提及户外泳池开放时间和消毒频次但文案中提到户外泳池9:00-18:00恒温开放五一期间每日消毒3次因此属于不符内容。应修改为户外泳池酒店配有户外无边泳池供大人小孩一同享受清凉时光。 \n5、产品材料中未提及健身房开放时间与具体细节但文案中提到健身房8:00-22:00配备亲子瑜伽课程需提前预约因此属于不符内容。应修改为健身房酒店提供免费健身中心方便您和家人一起强身健体。\n6、产品材料中未提及餐厅硬件配置但文案中提到自助晚餐隐藏彩蛋儿童餐区设独立洗手台+热食保温柜,因此属于虚构内容。应修改为:自助餐厅:供应鲜美海鲜、精美甜品等任君选择,大人小孩都爱吃!\n7、产品材料中未提及酒店安保措施但文案中提到安全保障全区域监控+24小时安保巡逻因此属于不符内容。应修改为安全保障酒店设有完善的监控系统和安保措施无需担心您与家人的安全。\n8、产品材料中未提及房内配有加厚床垫/卡通洗漱杯/尿布台无需额外购买因此属于不符内容。应回顾产品资料中关于房内配置的内容修改为房内配置55英寸超大纯平电视+独立的浴缸+超大的落地玻璃窗,尽览蕉门河风景,尽享亲子度假时光。\n9、产品材料中未提及五一专属加码但文案中提到5月1-5日期间入住凭房卡可免费领取儿童防晒冰袖+湿巾礼包因此属于不符内容。应回顾产品资料找到现有文案未提及的产品特色修改为套餐专属福利1、豪华客房一间一晚(周一至四只开放双床房) 2、2大1小自助早晚餐 3、赠送2大1小水鸟世界门票酒店前台领取无需额外购买。\n10、产品资料中未提及水鸟世界门票领取有时间限制但文案中提到水鸟世界门票需提前1小时至前台领取纸质票因此属于不符内容。应修改为酒店前台领取水鸟世界纸质门票\n综合以上分析结果,将修改应用到原文案中,得到修改后的文案。",
"title": "五一遛娃👶必囤喜来登1088景观房",
"content": "五一不想挤人潮?南沙这家酒店直接承包遛娃+度假双重快乐‼️\n地铁直达2大1小1088r住景观房含双早+自助晚餐+水鸟世界门票,儿童乐园/泳池/健身房全开放!\n🌟【遛娃刚需全配齐】\n✅ 儿童乐园:酒店设有免费儿童乐园,提供丰富的游乐设施,让孩子们尽情玩耍\n✅ 户外泳池:酒店配有户外无边泳池,供大人小孩一同享受清凉时光 \n✅ 健身房:酒店提供免费健身中心,适合家庭成员共同锻炼。\n\n📍【1小时玩转南沙】\n① 南沙天后宫车程20分钟穿汉服拍大片听妈祖传说涨知识\n② 南沙湿地公园40分钟5月芦苇摇曳带娃认鸟类+乘船探秘\n③ 十九涌海鲜街45分钟现捞现煮生猛海鲜人均50r吃到撑 \n\n🍽️【家长友好细节】 \n• 自助餐厅:供应鲜美海鲜、精美甜品等任君选择,大人小孩都爱吃 \n• 房内配置55英寸超大纯平电视+独立的浴缸+超大的落地玻璃窗,尽览蕉门河风景,尽享亲子度假时光 \n• 安全保障:酒店设有完善的监控系统和安保措施,全力保障您与家人的安全 \n\n🎁【套餐专属福利】\n1、豪华客房一间一晚(周一至四只开放双床房) \n2、2大1小自助早晚餐 \n3、赠送2大1小水鸟世界门票酒店前台领取无需额外购买 \n\n📌Tips \n1. 周一至周四仅限双床房型,周五起可选大床房 \n2. 酒店前台领取水鸟世界纸质门票 \n3. 地铁四号线金洲站下车打车15分钟直达酒店 \n\n这个五一,南沙喜来登让你躺着遛娃!不用长途跋涉,家门口就能玩出仪式感~\n"
}
@ -163,125 +154,157 @@ class ContentJudger:
logging.error(f"从PromptManager获取系统提示词失败: {e}")
return False
def judge_content(self, product_info, content, temperature=0.2, top_p=0.5, presence_penalty=0.0):
def _split_content(self, result):
"""
审核内容是否符合产品资料并提供修改建议
参考tweet_generator的处理方式解析AI返回的内容
Args:
product_info: 产品资料信息字符串
content_json: 需要审核的内容JSON对象或JSON字符串
temperature: 温度参数控制随机性
top_p: 核采样参数
presence_penalty: 存在惩罚参数
result: AI返回的原始结果
Returns:
dict: 审核后的结果JSON包含修改后的title和content以及judge_success状态
dict: 解析后的JSON数据
"""
try:
# 处理AI可能返回的思考部分
processed_result = result
if "</think>" in result:
processed_result = result.split("</think>")[1] # 取</think>标签后的内容
# 直接尝试解析JSON
json_data = json.loads(processed_result)
json_data["error"] = False
json_data["judge_success"] = True
return json_data
except json.JSONDecodeError as json_err:
# JSON解析失败记录错误并尝试更基本的处理方法
logging.warning(f"解析内容时出错: {json_err}, 尝试提取JSON部分")
try:
# 尝试找到JSON部分从第一个{到最后一个}
json_start = processed_result.find('{')
json_end = processed_result.rfind('}') + 1
if json_start >= 0 and json_end > json_start:
json_str = processed_result[json_start:json_end]
json_data = json.loads(json_str)
json_data["error"] = False
json_data["judge_success"] = True
return json_data
except Exception as e:
logging.error(f"尝试提取JSON部分失败: {e}")
except Exception as e:
logging.error(f"解析内容时出错: {e}")
# 所有解析方法都失败,返回一个默认结果
return {
"title": "",
"content": "",
"error": True,
"judge_success": False,
"analysis": f"内容解析失败,错误信息: {str(e)}"
}
def judge_content(self, product_info, content, temperature=0.2, top_p=0.5, presence_penalty=0.0):
"""审核内容"""
logging.info("开始内容审核流程")
# 构建用户提示词
user_prompt = self._build_user_prompt(product_info, content)
response_id = int(time.time())
try:
# 调用AI模型进行内容审核
logging.info("调用AI模型进行内容审核")
start_time = time.time()
# 使用AI_Agent的工作方法
# 调用AI模型
result, _, _ = self.ai_agent.work(
system_prompt=self._system_prompt,
user_prompt=user_prompt,
file_folder=None, # 不使用文件夹
file_folder=None,
temperature=self._temperature,
top_p=self._topp,
presence_penalty=self._presence_penatly,
)
end_time = time.time()
logging.info(f"AI模型响应完成耗时{end_time - start_time:.2f}")
# 保存原始响应以便调试
self._save_response(result, response_id)
# 保存原始响应用于调试
response_log_dir = "/root/autodl-tmp/TravelContentCreator/log/judge_responses"
os.makedirs(response_log_dir, exist_ok=True)
response_log_file = f"{response_log_dir}/response_{int(time.time())}.txt"
with open(response_log_file, "w", encoding="utf-8") as f:
f.write(result)
logging.info(f"原始响应已保存到: {response_log_file}")
# 使用简化的解析方法处理响应
content_json = self._split_content(result)
# 检查解析结果是否有错误
if content_json.get("error", False):
logging.warning(f"内容解析失败,使用原内容")
return self._create_fallback_result(content)
# 检查必要字段是否存在
if "title" not in content_json or "content" not in content_json:
logging.warning(f"解析结果缺少必要字段 'title''content'")
content_json["judge_success"] = False
return self._create_fallback_result(content)
# 添加Base64编码内容
result_dict = {
"judge_success": content_json.get("judge_success", True),
"judged": True,
"title": content_json["title"],
"content": content_json["content"],
"title_base64": base64.b64encode(content_json["title"].encode('utf-8')).decode('utf-8'),
"content_base64": base64.b64encode(content_json["content"].encode('utf-8')).decode('utf-8')
}
# 如果有analysis字段也包含
if "analysis" in content_json:
result_dict["analysis"] = content_json["analysis"]
result_dict["analysis_base64"] = base64.b64encode(content_json["analysis"].encode('utf-8')).decode('utf-8')
return result_dict
# 提取修改后的内容
modified_content = self._extract_modified_content(result)
if modified_content:
logging.info("成功提取修改后的内容")
# 添加judge_success字段
modified_content["judge_success"] = True
# 对内容进行最终清理确保可以安全序列化为JSON
modified_content = self._prepare_content_for_serialization(modified_content)
# 记录处理后的内容用于调试
debug_log_file = f"{response_log_dir}/processed_{int(time.time())}.json"
try:
serialized_content = json.dumps(modified_content, ensure_ascii=False, allow_nan=True, indent=2)
with open(debug_log_file, "w", encoding="utf-8") as f:
f.write(serialized_content)
logging.info(f"处理后的内容已保存到: {debug_log_file}")
except Exception as e:
logging.error(f"尝试记录处理后内容时序列化失败: {e}")
with open(debug_log_file, "w", encoding="utf-8") as f:
f.write(f"序列化失败: {str(e)}\n\n")
f.write(f"title: {modified_content.get('title', 'N/A')}\n")
f.write(f"content前100字符: {str(modified_content.get('content', 'N/A'))[:100]}")
# 验证序列化是否成功
try:
json.dumps(modified_content, ensure_ascii=False, allow_nan=True)
logging.info("内容可以安全序列化为JSON")
except Exception as e:
logging.error(f"验证序列化时出错: {e}")
# 找出导致错误的字段
for key, value in modified_content.items():
if isinstance(value, str):
try:
json.dumps(value, ensure_ascii=False)
except Exception as sub_e:
logging.error(f"字段 '{key}' 无法序列化: {sub_e}")
# 尝试定位问题字符
for i, char in enumerate(value):
try:
json.dumps(char, ensure_ascii=False)
except:
logging.error(f"位置 {i}, 字符 '{char}' (Unicode: U+{ord(char):04X}) 导致错误")
modified_content["raw_result"] = str(e)
modified_content["error"] = True
return modified_content
else:
logging.error("无法从响应中提取有效内容")
# 尝试使用原始内容并标记审核失败
if isinstance(content, dict) and "title" in content and "content" in content:
result_content = {
"title": content.get("title", "提取失败"),
"content": content.get("content", "无法从响应中提取有效内容"),
"judge_success": False
}
# 确保可以序列化
return self._prepare_content_for_serialization(result_content)
result_content = {
"title": "提取失败",
"content": "无法从响应中提取有效内容",
"judge_success": False
}
return self._prepare_content_for_serialization(result_content)
except Exception as e:
logging.exception(f"审核过程中出错: {e}")
result_content = {
"title": "审核失败",
"content": f"审核过程中出错: {str(e)}",
"judge_success": False
}
return self._prepare_content_for_serialization(result_content)
return self._create_fallback_result(content, error_msg=str(e))
def _save_response(self, response, response_id):
"""保存原始响应"""
try:
response_log_dir = "/root/autodl-tmp/TravelContentCreator/log/judge_responses"
os.makedirs(response_log_dir, exist_ok=True)
with open(f"{response_log_dir}/response_{response_id}.txt", "w", encoding="utf-8") as f:
f.write(response)
except Exception as e:
logging.error(f"保存原始响应失败: {e}")
def _create_fallback_result(self, content, error_msg="解析失败"):
"""创建回退结果"""
if isinstance(content, str):
# 尝试解析内容字符串看是否是JSON字符串
try:
content_obj = json.loads(content)
title = content_obj.get("title", "")
content_text = content_obj.get("content", "")
except:
# 不是JSON字符串视为纯文本内容
title = "审核失败"
content_text = content
elif isinstance(content, dict):
# 已经是字典对象
title = content.get("title", "")
content_text = content.get("content", "")
else:
# 其他类型,创建空内容
title = "审核失败"
content_text = f"无法解析内容: {error_msg}"
return {
"judge_success": False,
"judged": True,
"title": title,
"content": content_text,
"title_base64": base64.b64encode(title.encode('utf-8')).decode('utf-8'),
"content_base64": base64.b64encode(content_text.encode('utf-8')).decode('utf-8'),
"analysis": f"内容审核失败: {error_msg}",
"analysis_base64": base64.b64encode(f"内容审核失败: {error_msg}".encode('utf-8')).decode('utf-8')
}
def _build_user_prompt(self, product_info, content_gen):
"""
构建用户提示词
@ -293,367 +316,16 @@ class ContentJudger:
Returns:
str: 构建好的用户提示词
"""
# 确保content_gen为字符串格式
if isinstance(content_gen, dict):
content_str = f"title: {content_gen.get('title', '')}\n\ncontent: {content_gen.get('content', '')}"
else:
content_str = str(content_gen)
return f"""
## 产品资料(真实信息,作为判断依据):
{product_info}
## 运营生成的文案(需要审核的内容):
{content_gen}
"""
def _extract_modified_content(self, result_text):
"""从检测结果文本中提取修改后的文案内容"""
try:
processed_text = result_text # Work on a copy of the input text
# 记录原始文本前100个字符用于调试
logging.debug(f"原始响应文本前100字符: {result_text[:100]}")
# 尝试方法1: 使用</think>标签分离内容
if "</think>" in processed_text:
processed_text = processed_text.split("</think>", 1)[1].strip()
logging.debug("检测到</think>标签并分离内容")
# 尝试方法2: 预处理文本并尝试解析JSON
try:
# 彻底清理文本去除所有可能影响JSON解析的控制字符
cleaned_text = self._sanitize_json_text(processed_text)
logging.debug(f"清理后文本前100字符: {cleaned_text[:100]}")
content_json = json.loads(cleaned_text)
if "title" in content_json and "content" in content_json:
logging.info("成功通过JSON解析提取内容")
title = content_json.get("title", "").strip()
content = content_json.get("content", "").strip()
analysis = content_json.get("analysis", "")
logging.debug(f"提取到标题: {title[:30]}...")
return {
"title": title,
"content": content,
"analysis": analysis
}
except json.JSONDecodeError as e:
logging.warning(f"JSON解析失败: {e},将尝试其他提取方法")
# 记录更多错误信息以便调试
error_position = e.pos
error_context = cleaned_text[max(0, error_position-30):min(len(cleaned_text), error_position+30)]
logging.debug(f"错误位置附近的文本: {error_context}")
logging.debug(f"错误行列: 行 {e.lineno}, 列 {e.colno}")
# 尝试方法3: 从文本中提取JSON格式部分
json_start = processed_text.find('{')
json_end = processed_text.rfind('}') + 1
if json_start >= 0 and json_end > json_start:
json_str = processed_text[json_start:json_end]
logging.debug(f"找到JSON字符串长度: {len(json_str)}前100字符: {json_str[:100]}")
# 清理可能破坏JSON解析的控制字符
json_str_cleaned = self._sanitize_json_text(json_str)
try:
content_json = json.loads(json_str_cleaned)
if "title" in content_json and "content" in content_json:
logging.info("成功从文本中提取JSON部分并解析")
return {
"title": content_json.get("title", "").strip(),
"content": content_json.get("content", "").strip(),
"analysis": content_json.get("analysis", "")
}
except json.JSONDecodeError as e:
logging.warning(f"JSON子串解析失败: {e},将尝试正则表达式提取")
# 保存导致错误的JSON字符串到文件
self._save_problematic_json(json_str_cleaned, e)
# 尝试方法4: 手动解析JSON格式的关键字段
try:
logging.debug("尝试手动解析JSON结构")
manual_result = self._manual_json_extract(processed_text)
if manual_result and "title" in manual_result and "content" in manual_result:
logging.info("成功通过手动解析JSON提取内容")
return manual_result
except Exception as e:
logging.warning(f"手动解析JSON失败: {e}")
# 尝试方法5: 使用正则表达式提取
logging.debug("尝试使用正则表达式提取")
# 更强大的正则表达式,处理多行内容
title_match = re.search(r'"title"\s*:\s*"((?:[^"\\]|\\.|[\r\n])+)"', processed_text, re.DOTALL)
content_match = re.search(r'"content"\s*:\s*"((?:[^"\\]|\\.|[\r\n])+)"', processed_text, re.DOTALL)
analysis_match = re.search(r'"analysis"\s*:\s*"((?:[^"\\]|\\.|[\r\n])+)"', processed_text, re.DOTALL)
if title_match and content_match:
logging.info("成功使用正则表达式提取标题和内容")
return {
"title": title_match.group(1).replace('\\"', '"').strip(),
"content": content_match.group(1).replace('\\"', '"').strip(),
"analysis": analysis_match.group(1).replace('\\"', '"').strip() if analysis_match else ""
}
# 尝试方法6: 查找使用单引号的内容
logging.debug("尝试查找使用单引号的内容")
title_match = re.search(r'"title"\s*:\s*\'((?:[^\'\\]|\\.|[\r\n])+)\'', processed_text, re.DOTALL)
content_match = re.search(r'"content"\s*:\s*\'((?:[^\'\\]|\\.|[\r\n])+)\'', processed_text, re.DOTALL)
analysis_match = re.search(r'"analysis"\s*:\s*\'((?:[^\'\\]|\\.|[\r\n])+)\'', processed_text, re.DOTALL)
if title_match and content_match:
logging.info("成功使用单引号正则表达式提取内容")
return {
"title": title_match.group(1).strip(),
"content": content_match.group(1).strip(),
"analysis": analysis_match.group(1).strip() if analysis_match else ""
}
# 尝试方法7: 使用非标准格式提取
logging.debug("尝试非标准格式提取")
title_pattern = re.compile(r'["""]?title["""]?[:]\s*["""]([^"""]+)["""]', re.IGNORECASE | re.DOTALL)
content_pattern = re.compile(r'["""]?content["""]?[:]\s*["""]([^"""]+)["""]', re.IGNORECASE | re.DOTALL)
analysis_pattern = re.compile(r'["""]?analysis["""]?[:]\s*["""]([^"""]+)["""]', re.IGNORECASE | re.DOTALL)
title_match = title_pattern.search(processed_text)
content_match = content_pattern.search(processed_text)
analysis_match = analysis_pattern.search(processed_text)
if title_match and content_match:
logging.info("成功使用灵活模式匹配提取内容")
return {
"title": title_match.group(1).strip(),
"content": content_match.group(1).strip(),
"analysis": analysis_match.group(1).strip() if analysis_match else ""
}
logging.warning(f"所有提取方法失败响应前300字符: {processed_text[:300]}...")
return None # 所有方法失败时的回退选项
except Exception as e:
logging.error(f"内容提取过程中发生意外错误: {e}\n{traceback.format_exc()}")
return None
def _sanitize_json_text(self, text):
"""彻底清理文本确保可以安全解析为JSON同时保留换行符"""
# 步骤1: 处理控制字符,但保留换行符、回车和制表符
cleaned = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', text)
# 不再将实际换行符转换为\n字符串保留原始换行符
# cleaned = cleaned.replace('\n', '\\n').replace('\r', '\\r')
# 步骤3: 处理内容字段中开始或结束可能存在的多余空格或引号
cleaned = re.sub(r'"content"\s*:\s*"\s*', '"content":"', cleaned)
cleaned = re.sub(r'"\s*,', '",', cleaned)
# 步骤4: 处理未转义的引号和反斜杠
cleaned = re.sub(r'(?<!\\)"(?=(?:(?:[^"\\]|\\.)*"(?:[^"\\]|\\.)*")*[^"\\]*$)', '\\"', cleaned)
# 步骤5: 处理可能的Unicode转义
cleaned = re.sub(r'\\u([0-9a-fA-F]{4})', lambda m: chr(int(m.group(1), 16)), cleaned)
return cleaned
def _manual_json_extract(self, text):
"""手动解析JSON结构提取关键字段"""
try:
# 使用状态机方式手动解析
result = {}
# 查找title字段
title_start = text.find('"title"')
if title_start >= 0:
colon_pos = text.find(':', title_start)
if colon_pos > 0:
quote_pos = text.find('"', colon_pos)
if quote_pos > 0:
end_quote_pos = text.find('"', quote_pos + 1)
while end_quote_pos > 0 and text[end_quote_pos-1] == '\\':
end_quote_pos = text.find('"', end_quote_pos + 1)
if end_quote_pos > 0:
result['title'] = text[quote_pos+1:end_quote_pos].replace('\\"', '"').strip()
# 查找content字段
content_start = text.find('"content"')
if content_start >= 0:
colon_pos = text.find(':', content_start)
if colon_pos > 0:
quote_pos = text.find('"', colon_pos)
if quote_pos > 0:
# 查找非转义双引号
pos = quote_pos + 1
content_end = -1
while pos < len(text):
if text[pos] == '"' and (pos == 0 or text[pos-1] != '\\'):
content_end = pos
break
pos += 1
if content_end > 0:
content = text[quote_pos+1:content_end].replace('\\"', '"')
# 处理反斜杠转义的换行符,如果字符串中有'\n',将其转换为实际换行符
# 但如果已经是实际的换行符,则保留
if '\\n' in content:
content = content.replace('\\n', '\n')
if '\\r' in content:
content = content.replace('\\r', '\r')
result['content'] = content.strip()
# 查找analysis字段
analysis_start = text.find('"analysis"')
if analysis_start >= 0:
colon_pos = text.find(':', analysis_start)
if colon_pos > 0:
quote_pos = text.find('"', colon_pos)
if quote_pos > 0:
pos = quote_pos + 1
analysis_end = -1
while pos < len(text):
if text[pos] == '"' and (pos == 0 or text[pos-1] != '\\'):
analysis_end = pos
break
pos += 1
if analysis_end > 0:
analysis = text[quote_pos+1:analysis_end].replace('\\"', '"')
# 处理反斜杠转义的换行符
if '\\n' in analysis:
analysis = analysis.replace('\\n', '\n')
if '\\r' in analysis:
analysis = analysis.replace('\\r', '\r')
result['analysis'] = analysis.strip()
return result if 'title' in result and 'content' in result else None
except Exception as e:
logging.error(f"手动解析过程中出错: {e}")
return None
def _save_problematic_json(self, json_text, error):
"""保存导致解析错误的JSON字符串用于调试"""
try:
error_log_dir = "/root/autodl-tmp/TravelContentCreator/log/json_errors"
os.makedirs(error_log_dir, exist_ok=True)
error_log_file = f"{error_log_dir}/error_{int(time.time())}.json"
with open(error_log_file, "w", encoding="utf-8") as f:
f.write(f"# 错误信息: {str(error)}\n")
f.write(f"# 错误位置: 行 {error.lineno}, 列 {error.colno}\n")
f.write(json_text)
logging.info(f"已保存问题JSON到: {error_log_file}")
except Exception as e:
logging.error(f"保存问题JSON时出错: {e}")
def test_extraction_from_file(self, response_file_path):
"""
从文件中读取响应并测试提取功能
Args:
response_file_path: 响应文件路径
Returns:
dict: 提取结果
"""
try:
logging.info(f"从文件测试提取: {response_file_path}")
with open(response_file_path, 'r', encoding='utf-8') as f:
response_text = f.read()
result = self._extract_modified_content(response_text)
if result:
logging.info(f"成功从文件提取内容: {result.get('title', '')[:30]}...")
return {"success": True, "result": result}
else:
logging.error(f"从文件中提取内容失败")
return {"success": False, "error": "提取失败"}
except Exception as e:
logging.exception(f"测试提取时发生错误: {e}")
return {"success": False, "error": str(e)}
def _prepare_content_for_serialization(self, content_dict):
"""
对内容进行处理确保可以安全序列化为JSON同时保留emoji字符和换行符
Args:
content_dict: 内容字典
Returns:
dict: 处理后的内容字典
"""
try:
# 创建一个新字典,避免修改原始内容
safe_dict = {}
for key, value in content_dict.items():
# 处理字符串类型的值
if isinstance(value, str):
# 第一步:清理控制字符,但保留换行符、回车和制表符
safe_value = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', value)
# 确保文本中的反斜杠换行符(如\\n)被转换为实际换行符
if '\\n' in safe_value:
safe_value = safe_value.replace('\\n', '\n')
if '\\r' in safe_value:
safe_value = safe_value.replace('\\r', '\r')
# 第二步将emoji字符和其他非ASCII字符转换为相应的Unicode转义序列
char_list = []
for char in safe_value:
# 保留常见的控制字符(换行符、回车、制表符)
if char in '\n\r\t':
char_list.append(char)
elif ord(char) > 127: # 非ASCII字符
# 尝试保留高位字符包括emoji
try:
# 验证这个字符是否可以安全序列化
json.dumps(char, ensure_ascii=False)
char_list.append(char)
except:
# 如果这个字符无法序列化使用其Unicode码点的字符串表示
char_list.append(f"\\u{ord(char):04x}")
else:
char_list.append(char)
processed_value = ''.join(char_list)
# 最终验证这个值是否可以安全序列化
try:
json.dumps(processed_value, ensure_ascii=False)
safe_dict[key] = processed_value
except Exception as e:
logging.warning(f"处理后的'{key}'值仍无法序列化: {e},将进行更严格处理")
# 更严格的处理保留ASCII字符和基本控制字符
safe_value = ''
for c in processed_value:
if c in '\n\r\t' or (32 <= ord(c) < 127):
safe_value += c
safe_dict[key] = safe_value
else:
safe_dict[key] = value
# 最终验证整个字典是否可序列化
try:
# 使用ensure_ascii=False允许非ASCII字符直接出现在JSON中
# 使用allow_nan=True允许特殊浮点数值
json_str = json.dumps(safe_dict, ensure_ascii=False, allow_nan=True)
# 验证生成的JSON是否有效
json.loads(json_str)
except Exception as e:
logging.error(f"最终字典序列化验证失败: {e}")
# 如果依然失败,返回一个绝对安全的结果,但保留换行符
safe_content = ''
original_content = content_dict.get("content", "内容包含无法安全序列化的字符")
for c in original_content:
if c in '\n\r\t' or (32 <= ord(c) < 127):
safe_content += c
return {
"title": re.sub(r'[^\x20-\x7E]', '', content_dict.get("title", "序列化处理失败")),
"content": safe_content,
"judge_success": content_dict.get("judge_success", False),
"error": True,
"raw_result": str(e)
}
return safe_dict
except Exception as e:
logging.error(f"序列化准备过程中发生意外错误: {e}")
return {
"title": "序列化处理失败",
"content": "处理内容时发生意外错误",
"judge_success": False,
"error": True,
"raw_result": str(e)
}
{content_str}
"""

View File

@ -281,17 +281,65 @@ class FileSystemOutputHandler(OutputHandler):
return ""
return ''.join(c for c in text if 32 <= ord(c) <= 126)
def _preprocess_for_json(self, text):
"""预处理文本,将换行符转换为\\n形式保证JSON安全"""
if not isinstance(text, str):
return text
# 将所有实际换行符替换为\\n字符串
return text.replace('\n', '\\n').replace('\r', '\\r')
def handle_poster_configs(self, run_id: str, topic_index: int, config_data: list | dict):
"""Saves the complete poster configuration list/dict for a topic."""
run_dir = self._get_run_dir(run_id)
config_path = os.path.join(run_dir, f"topic_{topic_index}_poster_configs.json")
"""处理海报配置数据"""
# 处理海报配置数据
try:
with open(config_path, 'w', encoding='utf-8') as f_cfg_topic:
# 不使用自定义编码器使用标准json
json.dump(config_data, f_cfg_topic, ensure_ascii=False, indent=4, ignore_nan=True)
logging.info(f"Saved complete poster configurations for topic {topic_index} to: {config_path}")
except Exception as save_err:
logging.error(f"Failed to save complete poster configurations for topic {topic_index} to {config_path}: {save_err}")
# 创建目标目录
variant_dir = os.path.join(self._get_run_dir(run_id), f"{topic_index}_1")
os.makedirs(variant_dir, exist_ok=True)
# 确保配置数据是可序列化的
processed_configs = []
if isinstance(config_data, list):
for config in config_data:
processed_config = {}
# 处理索引字段
processed_config["index"] = config.get("index", 0)
# 处理标题字段应用JSON预处理
main_title = config.get("main_title", "")
processed_config["main_title"] = self._preprocess_for_json(main_title)
# 处理文本字段列表对每个文本应用JSON预处理
texts = config.get("texts", [])
processed_texts = []
for text in texts:
processed_texts.append(self._preprocess_for_json(text))
processed_config["texts"] = processed_texts
processed_configs.append(processed_config)
else:
# 如果不是列表,可能是字典或其他格式,尝试转换
if isinstance(config_data, dict):
# 处理单个配置字典
processed_config = {}
processed_config["index"] = config_data.get("index", 0)
processed_config["main_title"] = self._preprocess_for_json(config_data.get("main_title", ""))
texts = config_data.get("texts", [])
processed_texts = []
for text in texts:
processed_texts.append(self._preprocess_for_json(text))
processed_config["texts"] = processed_texts
processed_configs.append(processed_config)
# 保存配置到JSON文件
config_file_path = os.path.join(variant_dir, f"topic_{topic_index}_poster_configs.json")
with open(config_file_path, 'w', encoding='utf-8') as f:
json.dump(processed_configs, f, ensure_ascii=False, indent=4, cls=self.SafeJSONEncoder)
logging.info(f"Successfully saved poster configs to {config_file_path}")
except Exception as e:
logging.error(f"Error saving poster configs: {e}")
traceback.print_exc()
def handle_generated_image(self, run_id: str, topic_index: int, variant_index: int, image_type: str, image_data, output_filename: str, metadata: dict = None):
"""处理生成的图像对于笔记图像和额外配图保存到image目录其他类型保持原有路径结构"""

View File

@ -620,15 +620,47 @@ def generate_posters_for_topic(topic_item: dict,
if os.path.exists(content_path):
with open(content_path, 'r', encoding='utf-8') as f_content:
content_data = json.load(f_content)
# 支持Base64编码格式的文件
if 'title_base64' in content_data and 'content_base64' in content_data:
import base64
logging.info(f"检测到Base64编码的内容文件: {content_path}")
# 解码Base64内容
try:
title = base64.b64decode(content_data.get('title_base64', '')).decode('utf-8')
content = base64.b64decode(content_data.get('content_base64', '')).decode('utf-8')
# 创建包含解码内容的新数据对象
decoded_data = {
'title': title,
'content': content,
'judge_success': content_data.get('judge_success', True),
'judged': content_data.get('judged', True)
}
# 如果有标签,也解码
if 'tags_base64' in content_data:
tags = base64.b64decode(content_data.get('tags_base64', '')).decode('utf-8')
decoded_data['tags'] = tags
loaded_content_list.append(decoded_data)
logging.debug(f" 已成功解码并加载Base64内容: {content_path}")
continue
except Exception as decode_error:
logging.error(f" 解码Base64内容时出错: {decode_error},跳过此文件")
continue
# 常规JSON格式检查
if isinstance(content_data, dict) and 'title' in content_data and 'content' in content_data:
loaded_content_list.append(content_data)
logging.debug(f" Successfully loaded content from: {content_path}")
loaded_content_list.append(content_data)
logging.debug(f" Successfully loaded content from: {content_path}")
else:
logging.warning(f" Content file {content_path} has invalid format. Skipping.")
logging.warning(f" Content file {content_path} has invalid format. Skipping.")
else:
logging.warning(f" Content file not found for variant {variant_index}: {content_path}. Skipping.")
except json.JSONDecodeError:
logging.error(f" Error decoding JSON from content file: {content_path}. Skipping.")
logging.error(f" Error decoding JSON from content file: {content_path}. Skipping.")
except Exception as e:
logging.exception(f" Error loading content file {content_path}: {e}")