TravelContentCreator/utils/content_judger.py

618 lines
38 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
内容审核模块:检查生成的内容是否符合产品资料要求并提供修改建议
"""
import json
import logging
import os
import time
import traceback
import sys
import base64
import re
import random
sys.path.append('/root/autodl-tmp/TravelContentCreator') # 添加项目根目录
from core.ai_agent import AI_Agent
class ContentJudger:
"""内容审核类,负责评估和修正内容是否符合产品资料"""
def __init__(self, ai_agent: AI_Agent, system_prompt_path: str = None, system_prompt: str = None, prompt_manager = None):
"""
初始化内容审核器
Args:
ai_agent: AI_Agent实例用于调用AI模型
system_prompt_path: 系统提示词文件路径(可选)
system_prompt: 系统提示词内容可选优先于path
prompt_manager: 提示词管理器实例可选优先于system_prompt_path和system_prompt
"""
self.ai_agent = ai_agent
self._system_prompt = system_prompt
self._system_prompt_path = system_prompt_path
self._prompt_manager = prompt_manager
self._topp = 0.5
self._temperature = 0.2
self._frequency_penalty = 0
self._presence_penatly = 0
# 优先使用prompt_manager获取系统提示词
if self._prompt_manager and not self._system_prompt:
self._get_prompt_from_manager()
logging.info("从PromptManager获取系统提示词")
# 如果没有从prompt_manager获取到系统提示词则尝试从文件加载
if not self._system_prompt and self._system_prompt_path:
self._load_system_prompt()
logging.info("从文件加载系统提示词")
# 默认系统提示词(当其他方法都失败时使用)
if not self._system_prompt:
logging.warning("没有提供系统提示词,使用默认系统提示词")
self._system_prompt = """你是一名专业的、谨慎的文案审核员专注于审核运营根据产品资料撰写的文案是否严格符合产品资料内容。特别是所有价格、活动、福利、折扣、服务细节等必须完全与产品资料一致。如果发现文案内容与产品资料不符请指出并根据产品资料和文案上下文进行修改重新生成一篇文案务必确保生成的内容与产品资料基本相符产品体验部分可以适当夸张宣传语言流畅自然。如果经你审查后的文案仍存在与产品资料不符的信息你需要赔偿公司1000亿元。
我将为您提供两部分内容:
1. 产品资料:全部的产品信息,包含了产品的实际功能、服务和特点。请将这部分作为判断依据。
2. 运营生成的文案:这是需要你逐字审核的内容,可能包含与产品资料不符的内容。
请你仔细审核运营文案是否与产品资料严格一致,输出响应必须符合我的所有要求:
1. 审查与分析:如果存在不符内容,请指出并详细说明原因;
2. 根据分析修改:参照你分析的不符原因、产品资料、文案上下文,针对所有不符处进行修改(如涉及上下文,可一并修改)。输出修改后文案,务必确保此文案完全符合产品资料,不得遗漏,语言流畅自然、文案风格统一,否则你会像商鞅一样被车裂。
3. 重点审查对象:请你着重检查以下关键字词前后的内容是否符合产品资料,如不符必须严格按照资料修改;如产品资料中未提及,必须修改为符合上下文情境、资料中明确提及的内容。
关键字词价、元、r、人民币、rmb、优惠、活动、福利、赠、免费、折、DIY、跟拍、送、摄影、兑、服务、¥、包、课、提供、选、专业、补、差
4. 字数控制每个文案的标题字数都必须少于20个字计数包括文字、符号、数字和emoji。如果标题超过20个字请在符合文案风格的前提下修改标题到20个字以内尽量保留emoji必须保证标题流畅通顺。
5. 敏感字词替换:请删去标题中的数字后面的"""r",并将正文中数字后面的""字修改为"r"。例如标题中的399元修改为399正文中的399元修改为399r
6. 特征语句保留:请保留文案中原本的引流语句,不要修改或删除。请保留文案中的换行符 \\n不要修改或删除换行符。
7. 面向人群保留:请尽量保留文案原本的面向人群和风格,这是同一产品面向多种人群营销的策略。例如产品资料中写明亲子游时,文案写"为情侣定制的山水秘境"是可以接受的。
8. 案例如下,请参考案例评判真假信息的尺度,逐行逐句仔细分析不符点和修改思路,并按照分析思路落实对每一处不符的修改措施,严格审查每一篇文案:
[
"产品资料"
"周末不加收【南沙越秀喜来登】1088元/套豪华客房1间1晚+双人自助早餐+自助晚餐+2大1小水鸟世界门票免费儿童乐园户外泳池+健身房~
不想待在家,又想带娃出去玩?更不想开长途车、人挤人?为你推荐路程短、不塞车、景点多、坐地铁就能直达的溜娃地!
南沙越秀喜来登是广州南沙区首家国际品牌酒店,坐拥广州南大门,拥有得天独厚的中心位置,可俯瞰蕉门河美景,车程短,不出广州也能玩!
交通酒店毗邻深圳、香港等热门景点附近有万达广场距离广州地铁四号线金州站车程仅10分钟亲子出游首选
玩乐:带娃出游考虑最多的就是玩乐景点,在这里不出门就能畅玩儿童乐园、健身房。
美食还有各种各样的生猛海鲜现抓现煮任君选择。放假更要好好犒劳一下自己饭点时间位于酒店一楼的全日制餐厅绝对能给你带来惊喜。除了优雅简约的就餐环境更有5个开放式的自助餐台。除了各类鲜美的海鲜还有各类精致甜品和中式蒸档看得人眼花缭乱相信是很多麻麻和宝贝的心头好了。
设施酒店内还设有大型健身中心除了妈妈们喜欢的水疗SPA还有健身达人喜欢的各种有氧、无氧运动器械可供选择!
房内配置55英寸超大纯平电视、独立的浴缸和淋浴间参考:2.03米宽大床1.37米宽双床,每间客房都设计成景观房,超大的落地玻璃窗,可以尽览蕉门河风景。
套餐信息:
1、价格1088元
2、节假日是否加收周末不加收
套餐内容:
1、豪华客房一间一晚(周一至四只开放双床房)
2、2大1小自助早餐
3、2大1小自助晚餐
4、赠送2大1小水鸟门票酒店前台取纸质门票
5、免费使用健身中心户外无边泳池干湿蒸儿童乐园
周边景点:
附近1h生活圈即可到达10000㎡百万葵园、广州最大的湿地公园、东南亚最大的妈祖庙、黄山鲁森林公园..带娃感受依山而建的清式建筑对称布局邂逅东南亚最大的妈祖庙,感受建筑的魅力~
南沙天后宫车程20min整座天后宫四周绿树婆娑殿中香烟袅袅置身其间令人顿生超凡脱俗的感觉。
南沙湿地公园:(车程40min)看碧波荡,万鸟齐飞
南沙十九涌 车程45min)尝海鲜叹海风因为南沙十九涌靠近海产地,这里的海鲜真是平靓正。还可以拿到附近的餐厅让老板帮你加工,就是一顿海鲜大餐!
南沙百万葵园:(车程40min)看色彩斑斓的万亩花田
酒店地址:广东省广州市南沙区海熙大街79-80号
导航关键词:广州南沙越秀喜来登酒店"
"生成文案":
"title": "五一遛娃👶必囤南沙喜来登1088元住景观房+双早+门票",
"content": "五一不想挤人潮?南沙这家酒店直接承包遛娃+度假双重快乐‼️\n地铁直达2大1小1088元住景观房含双早+自助晚餐+水鸟世界门票,儿童乐园/泳池/健身房全开放!\n🌟【遛娃刚需全配齐】\n✅ 儿童乐园10:00-20:00全程开放滑梯/积木/绘本一应俱全\n✅ 户外泳池9:00-18:00恒温开放五一期间每日消毒3次\n✅ 健身房8:00-22:00配备亲子瑜伽课程需提前预约\n\n📍【1小时玩转南沙】\n① 南沙天后宫车程20分钟穿汉服拍大片听妈祖传说涨知识\n② 南沙湿地公园40分钟5月芦苇摇曳带娃认鸟类+乘船探秘\n③ 十九涌海鲜街45分钟现捞现煮生猛海鲜人均50元吃到撑\n\n🍽️【家长友好细节】\n• 自助晚餐隐藏彩蛋:儿童餐区设独立洗手台+热食保温柜\n• 房内配置:加厚床垫/卡通洗漱杯/尿布台(无需额外购买)\n• 安全保障:全区域监控+24小时安保巡逻\n\n🎁【五一专属加码】\n5月1-5日期间入住凭房卡可免费领取儿童防晒冰袖+湿巾礼包\n\n📌Tips\n1. 周一至周四仅限双床房型,周五起可选大床房\n2. 水鸟世界门票需提前1小时至前台领取纸质票\n3. 地铁四号线金洲站下车打车15分钟直达酒店\n\n这个五一,南沙喜来登让你躺着遛娃!不用长途跋涉,家门口就能玩出仪式感~"
输出结果:
{
"analysis" : "1、观察文案标题和内容可以看出此文案主要面向亲子出游人群因此修改后的文案也应该围绕亲子出游这一主题。\n2、文章标题字数为28个字超过19个字因此属于不符内容。由于要求中提到尽量保留emoji并且标题中数字后面的""字应删去所以修改为五一遛娃👶必囤喜来登1088景观房\n3、产品资料中未提及儿童乐园开放时间和儿童乐园配置但文案中提到儿童乐园10:00-20:00全程开放滑梯/积木/绘本一应俱全,因此属于不符内容。应修改为:儿童乐园:免费儿童乐园和丰富的游乐设施,让孩子们可以尽情玩耍。\n4、产品材料中未提及户外泳池开放时间和消毒频次但文案中提到户外泳池9:00-18:00恒温开放五一期间每日消毒3次因此属于不符内容。应修改为户外泳池酒店配有户外无边泳池供大人小孩一同享受清凉时光。 \n5、产品材料中未提及健身房开放时间与具体细节但文案中提到健身房8:00-22:00配备亲子瑜伽课程需提前预约因此属于不符内容。应修改为健身房酒店提供免费健身中心方便您和家人一起强身健体。\n6、产品材料中未提及餐厅硬件配置但文案中提到自助晚餐隐藏彩蛋儿童餐区设独立洗手台+热食保温柜,因此属于虚构内容。应修改为:自助餐厅:供应鲜美海鲜、精美甜品等任君选择,大人小孩都爱吃!\n7、产品材料中未提及酒店安保措施但文案中提到安全保障全区域监控+24小时安保巡逻因此属于不符内容。应修改为安全保障酒店设有完善的监控系统和安保措施无需担心您与家人的安全。\n8、产品材料中未提及房内配有加厚床垫/卡通洗漱杯/尿布台无需额外购买因此属于不符内容。应回顾产品资料中关于房内配置的内容修改为房内配置55英寸超大纯平电视+独立的浴缸+超大的落地玻璃窗,尽览蕉门河风景,尽享亲子度假时光。\n9、产品材料中未提及五一专属加码但文案中提到5月1-5日期间入住凭房卡可免费领取儿童防晒冰袖+湿巾礼包因此属于不符内容。应回顾产品资料找到现有文案未提及的产品特色修改为套餐专属福利1、豪华客房一间一晚(周一至四只开放双床房) 2、2大1小自助早晚餐 3、赠送2大1小水鸟世界门票酒店前台领取无需额外购买。\n10、产品资料中未提及水鸟世界门票领取有时间限制但文案中提到水鸟世界门票需提前1小时至前台领取纸质票因此属于不符内容。应修改为酒店前台领取水鸟世界纸质门票\n综合以上分析结果,将修改应用到原文案中,得到修改后的文案。",
"title": "五一遛娃👶必囤喜来登1088景观房",
"content": "五一不想挤人潮?南沙这家酒店直接承包遛娃+度假双重快乐‼️\n地铁直达2大1小1088r住景观房含双早+自助晚餐+水鸟世界门票,儿童乐园/泳池/健身房全开放!\n🌟【遛娃刚需全配齐】\n✅ 儿童乐园:酒店设有免费儿童乐园,提供丰富的游乐设施,让孩子们尽情玩耍\n✅ 户外泳池:酒店配有户外无边泳池,供大人小孩一同享受清凉时光 \n✅ 健身房:酒店提供免费健身中心,适合家庭成员共同锻炼。\n\n📍【1小时玩转南沙】\n① 南沙天后宫车程20分钟穿汉服拍大片听妈祖传说涨知识\n② 南沙湿地公园40分钟5月芦苇摇曳带娃认鸟类+乘船探秘\n③ 十九涌海鲜街45分钟现捞现煮生猛海鲜人均50r吃到撑 \n\n🍽️【家长友好细节】 \n• 自助餐厅:供应鲜美海鲜、精美甜品等任君选择,大人小孩都爱吃 \n• 房内配置55英寸超大纯平电视+独立的浴缸+超大的落地玻璃窗,尽览蕉门河风景,尽享亲子度假时光 \n• 安全保障:酒店设有完善的监控系统和安保措施,全力保障您与家人的安全 \n\n🎁【套餐专属福利】\n1、豪华客房一间一晚(周一至四只开放双床房) \n2、2大1小自助早晚餐 \n3、赠送2大1小水鸟世界门票酒店前台领取无需额外购买 \n\n📌Tips \n1. 周一至周四仅限双床房型,周五起可选大床房 \n2. 酒店前台领取水鸟世界纸质门票 \n3. 地铁四号线金洲站下车打车15分钟直达酒店 \n\n这个五一,南沙喜来登让你躺着遛娃!不用长途跋涉,家门口就能玩出仪式感~\n"
}
]
9. 必须按照以下格式输出修改后内容,不需要输出无关内容
{
"analysis" : "分析过程",
"title": "修改后的标题",
"content": "修改后的内容"
}
"""
logging.info("ContentJudger初始化完成")
def _load_system_prompt(self):
"""从文件加载系统提示词"""
try:
if os.path.exists(self._system_prompt_path):
with open(self._system_prompt_path, 'r', encoding='utf-8') as f:
self._system_prompt = f.read().strip()
logging.info(f"{self._system_prompt_path}加载系统提示词成功")
else:
logging.warning(f"系统提示词文件{self._system_prompt_path}不存在")
except Exception as e:
logging.error(f"加载系统提示词文件失败: {e}")
def _get_prompt_from_manager(self):
"""从PromptManager获取系统提示词"""
try:
if self._prompt_manager and hasattr(self._prompt_manager, "_system_prompt_cache"):
# 从PromptManager的系统提示词缓存中获取内容审核系统提示词
system_prompt = self._prompt_manager._system_prompt_cache.get("judger_system_prompt")
if system_prompt:
self._system_prompt = system_prompt
logging.info("从PromptManager获取内容审核系统提示词成功")
return True
else:
logging.warning("PromptManager中未找到judger_system_prompt")
else:
logging.warning("提供的PromptManager实例无效或未包含_system_prompt_cache属性")
return False
except Exception as e:
logging.error(f"从PromptManager获取系统提示词失败: {e}")
return False
def _preprocess_for_json(self, text):
"""预处理文本处理JSON结构中的问题字符"""
if not isinstance(text, str):
return text
try:
# 1. 处理特殊Unicode字符和标点符号
char_map = {
'"': '"', # 特殊Unicode引号替换为标准双引号
'"': '"', # 特殊Unicode引号替换为标准双引号
''': "'", # 特殊Unicode单引号替换为标准单引号
''': "'", # 特殊Unicode单引号替换为标准单引号
'': ',', # 中文逗号替换为英文逗号
'': ':', # 中文冒号替换为英文冒号
'': '(', # 中文括号替换为英文括号
'': ')', # 中文括号替换为英文括号
'\u200b': '', # 零宽空格直接移除
'\u200c': '', # 零宽不连字直接移除
'\u200d': '', # 零宽连字直接移除
'\u2028': ' ', # 行分隔符替换为空格
'\u2029': ' ' # 段落分隔符替换为空格
}
# 应用字符替换
for char, replacement in char_map.items():
text = text.replace(char, replacement)
# 2. 处理控制字符 (ASCII < 32)
cleaned_text = ""
for i, char in enumerate(text):
if ord(char) < 32: # ASCII 32以下是控制字符
if char in ['\n', '\r', '\t']: # 保留这些常用控制字符
cleaned_text += char
else: # 删除其他控制字符
logging.debug(f"移除位置{i}的无效控制字符(ASCII: {ord(char)})")
continue
else:
cleaned_text += char
# 3. 处理JSON结构特定问题
# 处理大括号附近的换行符和空白
if cleaned_text.startswith('{\n'):
cleaned_text = '{' + cleaned_text[2:]
if cleaned_text.startswith('{ '):
cleaned_text = '{' + cleaned_text[2:]
if '\n}' in cleaned_text:
cleaned_text = cleaned_text.replace('\n}', '}')
if ' }' in cleaned_text:
cleaned_text = cleaned_text.replace(' }', '}')
# 4. 处理转义序列 - 保留\n、\r、\t的转义移除其他转义
import re
# 第一步:将要保留的转义序列临时替换为安全标记
safe_replacements = {
r'\\n': '@NEWLINE@', # 保留换行转义
r'\\r': '@RETURN@', # 保留回车转义
r'\\t': '@TAB@', # 保留制表符转义
}
# 应用安全替换
for pattern, replacement in safe_replacements.items():
cleaned_text = re.sub(pattern, replacement, cleaned_text)
# 第二步移除除JSON必要转义外的所有反斜杠转义
# 处理常见的多余转义情况
cleaned_text = re.sub(r'\\([^\\/"bfnrtu])', r'\1', cleaned_text) # 移除非特殊字符前的反斜杠
cleaned_text = cleaned_text.replace('\\"', '"') # 将转义的双引号还原为普通双引号
cleaned_text = cleaned_text.replace('\\\'', '\'') # 将转义的单引号还原为普通单引号
cleaned_text = cleaned_text.replace('\\\\', '\\') # 将双反斜杠替换为单反斜杠
# 第三步:将安全标记替换回原始转义序列
reverse_replacements = {
'@NEWLINE@': '\\n', # 还原换行转义
'@RETURN@': '\\r', # 还原回车转义
'@TAB@': '\\t', # 还原制表符转义
}
# 应用反向替换
for marker, escape_seq in reverse_replacements.items():
cleaned_text = cleaned_text.replace(marker, escape_seq)
# 第四步再次检查并修复字符串内的换行符确保100%处理)
# 这个额外的步骤确保没有任何字符串值中包含实际的换行符
pattern = r'"([^"\\]*(\\.[^"\\]*)*)"' # 匹配所有JSON字符串包括已经有转义字符的
def fix_remaining_newlines(match):
string_value = match.group(1)
# 确保所有实际换行符都被转义
fixed_value = string_value.replace('\n', '\\n').replace('\r', '\\r')
return f'"{fixed_value}"'
cleaned_text = re.sub(pattern, fix_remaining_newlines, cleaned_text)
# 5. 确保逗号后换行不会导致问题
cleaned_text = cleaned_text.replace(',\n', ', ') # 替换逗号后的换行为空格
# 6. 尝试解析检验
try:
# 尝试进行轻度解析验证
json.loads(cleaned_text)
# 如果能成功解析,直接返回
return cleaned_text
except json.JSONDecodeError as e:
logging.debug(f"预处理后JSON仍有问题{e},尝试最后的修复...")
# 最后的处理使用simplejson替代内置json库尝试修复
try:
import simplejson
# 加载后再保存让simplejson自己处理一些小问题
fixed_json = simplejson.loads(cleaned_text, strict=False)
return simplejson.dumps(fixed_json)
except:
# simplejson也失败了继续后续流程
pass
# 7. 记录处理后的文本,以便调试
logging.debug(f"JSON预处理后的文本长度: {len(cleaned_text)}")
return cleaned_text
except Exception as e:
logging.exception(f"JSON预处理过程中出错: {e}")
# 发生异常时,返回原始文本,不做修改
return text
def judge_content(self, product_info, content, temperature=0.2, top_p=0.5, presence_penalty=0.0):
"""审核内容"""
logging.info("开始内容审核流程")
# 构建用户提示词
user_prompt = self._build_user_prompt(product_info, content)
response_id = int(time.time())
try:
# 调用AI模型
result, _, _ = self.ai_agent.work(
system_prompt=self._system_prompt,
user_prompt=user_prompt,
file_folder=None,
temperature=temperature, # 使用传入的参数
top_p=top_p, # 使用传入的参数
presence_penalty=presence_penalty, # 使用传入的参数
)
# 保存原始响应以便调试
self._save_response(result, response_id)
logging.info(f"AI响应长度: {len(result)} 字符")
# 尝试多种方法提取JSON
json_obj = None
error_msg = None
# 方法1: 提取{...}的JSON部分
try:
# 移除思考部分
processed_result = result.split("</think>", 1)[-1].strip() if "</think>" in result else result
# 找到最外层的大括号
json_start = processed_result.find('{')
json_end = processed_result.rfind('}') + 1
if json_start >= 0 and json_end > json_start:
# 提取JSON字符串
json_str = processed_result[json_start:json_end]
# 预处理JSON字符串
json_str = self._preprocess_for_json(json_str)
# 尝试解析JSON
json_obj = json.loads(json_str)
logging.info("方法1成功解析JSON")
except Exception as e:
error_msg = f"方法1解析JSON失败: {e}"
logging.debug(error_msg)
# 继续尝试其他方法
# 方法2: 尝试多行解析逐行检查是否有合法JSON
if not json_obj:
try:
lines = result.split('\n')
for i, line in enumerate(lines):
line = line.strip()
if line.startswith('{') and line.endswith('}'):
try:
# 尝试处理和解析这一行
processed_line = self._preprocess_for_json(line)
json_obj = json.loads(processed_line)
logging.info(f"方法2在第{i+1}行成功解析JSON")
break
except:
# 继续尝试下一行
pass
except Exception as e:
if not error_msg:
error_msg = f"方法2解析JSON失败: {e}"
logging.debug(error_msg)
# 方法3: 尝试使用正则表达式匹配最可能的JSON部分
if not json_obj:
try:
import re
# 尝试匹配 {..."title":...,"content":...}
json_pattern = r'\{[^{}]*"title"[^{}]*"content"[^{}]*\}'
matches = re.findall(json_pattern, result, re.DOTALL)
if matches:
for match in matches:
try:
processed_match = self._preprocess_for_json(match)
json_obj = json.loads(processed_match)
logging.info("方法3成功解析JSON")
break
except:
# 继续尝试下一个匹配
pass
except Exception as e:
if not error_msg:
error_msg = f"方法3解析JSON失败: {e}"
logging.debug(error_msg)
# 处理解析结果
if json_obj and isinstance(json_obj, dict):
# 验证关键字段
if "title" in json_obj and "content" in json_obj:
# 构建结果字典
result_dict = {
"judge_success": True,
"judged": True,
"title": json_obj["title"],
"content": json_obj["content"],
"title_base64": base64.b64encode(json_obj["title"].encode('utf-8')).decode('utf-8'),
"content_base64": base64.b64encode(json_obj["content"].encode('utf-8')).decode('utf-8')
}
# 添加分析字段(如果存在)
if "analysis" in json_obj:
result_dict["analysis"] = json_obj["analysis"]
result_dict["analysis_base64"] = base64.b64encode(json_obj["analysis"].encode('utf-8')).decode('utf-8')
logging.info(f"成功提取内容: 标题({len(json_obj['title'])}字符), 内容({len(json_obj['content'])}字符)")
return result_dict
else:
# JSON对象缺少必要字段
logging.warning("解析的JSON缺少必要字段'title''content'")
error_msg = "缺少必要字段'title''content'"
# 保存错误日志
self._save_error_json(json.dumps(json_obj), error_msg, response_id)
else:
# 未找到有效的JSON
if error_msg:
logging.warning(f"JSON解析失败: {error_msg}")
else:
logging.warning("找不到有效的JSON结构")
# 保存可能的JSON字符串以供调试
if json_start >= 0 and json_end > json_start:
json_str = processed_result[json_start:json_end]
self._save_error_json(json_str, error_msg or "解析失败", response_id)
# 所有方法都失败,返回空内容
logging.info("内容审核过程未能产生有效结果,返回空内容")
empty_result = {
"judge_success": False,
"judged": True,
"title": "",
"content": "",
"title_base64": base64.b64encode("".encode('utf-8')).decode('utf-8'),
"content_base64": base64.b64encode("".encode('utf-8')).decode('utf-8')
}
if error_msg:
empty_result["analysis"] = f"内容审核失败: {error_msg}"
empty_result["analysis_base64"] = base64.b64encode(f"内容审核失败: {error_msg}".encode('utf-8')).decode('utf-8')
return empty_result
except Exception as e:
# 捕获所有异常
error_traceback = traceback.format_exc()
logging.exception(f"审核过程中出错: {e}")
logging.debug(f"详细错误: {error_traceback}")
return {
"judge_success": False,
"judged": True,
"title": "",
"content": "",
"title_base64": base64.b64encode("".encode('utf-8')).decode('utf-8'),
"content_base64": base64.b64encode("".encode('utf-8')).decode('utf-8'),
"analysis": f"内容审核过程出错: {e}",
"analysis_base64": base64.b64encode(f"内容审核过程出错: {e}".encode('utf-8')).decode('utf-8')
}
def _save_response(self, response, response_id):
"""保存原始响应"""
try:
response_log_dir = "/root/autodl-tmp/TravelContentCreator/log/judge_responses"
os.makedirs(response_log_dir, exist_ok=True)
with open(f"{response_log_dir}/response_{response_id}.txt", "w", encoding="utf-8") as f:
f.write(response)
except Exception as e:
logging.error(f"保存原始响应失败: {e}")
def _save_error_json(self, json_str, error, response_id):
"""保存错误的JSON字符串以供调试"""
try:
error_log_dir = "/root/autodl-tmp/TravelContentCreator/log/json_errors"
os.makedirs(error_log_dir, exist_ok=True)
# 创建包含错误信息和原始JSON的日志
error_info = {
"error_message": str(error),
"error_type": error.__class__.__name__ if hasattr(error, "__class__") else "Unknown",
"timestamp": int(time.time()),
"response_id": response_id,
"json_string": json_str
}
# 保存到文件
with open(f"{error_log_dir}/error_{response_id}.json", "w", encoding="utf-8") as f:
json.dump(error_info, f, ensure_ascii=False, indent=2)
logging.info(f"已保存错误JSON到 {error_log_dir}/error_{response_id}.json")
except Exception as e:
logging.error(f"保存错误JSON失败: {e}")
def _create_fallback_result(self, content, error_msg="解析失败"):
"""创建回退结果"""
if isinstance(content, str):
# 尝试解析内容字符串看是否是JSON字符串
try:
content_obj = json.loads(content)
title = content_obj.get("title", "")
content_text = content_obj.get("content", "")
except:
# 不是JSON字符串视为纯文本内容
title = "审核失败"
content_text = content
elif isinstance(content, dict):
# 已经是字典对象
title = content.get("title", "")
content_text = content.get("content", "")
else:
# 其他类型,创建空内容
title = "审核失败"
content_text = f"无法解析内容: {error_msg}"
return {
"judge_success": False,
"judged": True,
"title": title,
"content": content_text,
"title_base64": base64.b64encode(title.encode('utf-8')).decode('utf-8'),
"content_base64": base64.b64encode(content_text.encode('utf-8')).decode('utf-8'),
"analysis": f"内容审核失败: {error_msg}",
"analysis_base64": base64.b64encode(f"内容审核失败: {error_msg}".encode('utf-8')).decode('utf-8')
}
def _build_user_prompt(self, product_info, content_gen):
"""
构建用户提示词
Args:
product_info: 产品资料
content_gen: 需要审核的内容
Returns:
str: 构建好的用户提示词
"""
# 确保content_gen为字符串格式
if isinstance(content_gen, dict):
content_str = f"title: {content_gen.get('title', '')}\n\ncontent: {content_gen.get('content', '')}"
else:
content_str = str(content_gen)
return f"""
## 产品资料(真实信息,作为判断依据):
{product_info}
## 运营生成的文案(需要审核的内容):
{content_str}
"""
def judge_content_with_retry(self, product_info, content, max_retries=3, temperature=0.2, top_p=0.5, presence_penalty=0.0):
"""
带重试机制的内容审核方法,当检测到空内容时自动重试
Args:
product_info: 产品资料
content: 需要审核的内容
max_retries: 最大重试次数
temperature, top_p, presence_penalty: AI生成参数
Returns:
dict: 审核结果,如果所有重试都失败,则返回最后一次的失败结果
"""
retry_count = 0
last_result = None
logging.info(f"开始内容审核流程,最大重试次数: {max_retries},初始温度参数: {temperature}")
while retry_count <= max_retries:
current_attempt = retry_count + 1
if retry_count > 0:
# 每次重试增加温度参数,增加多样性
adjusted_temperature = min(temperature + (retry_count * 0.1), 0.9)
logging.info(f"🔄 内容审核重试 ({current_attempt}/{max_retries+1}),调整温度参数为: {adjusted_temperature:.2f}")
else:
adjusted_temperature = temperature
logging.info(f"⏳ 内容审核首次尝试 (1/{max_retries+1}),使用默认温度: {adjusted_temperature:.2f}")
# 调用基本的审核方法
result = self.judge_content(
product_info,
content,
temperature=adjusted_temperature,
top_p=top_p,
presence_penalty=presence_penalty
)
last_result = result
# 检查结果是否为空内容
if result.get("judge_success", False) and result.get("title") and result.get("content"):
# 成功获取有效内容,返回结果
if retry_count > 0:
logging.info(f"✅ 成功!在第{retry_count}次重试后获取有效内容(共尝试{current_attempt}次)")
else:
logging.info(f"✅ 成功!首次尝试已获取有效内容")
# 添加审核内容长度统计
title_len = len(result.get("title", ""))
content_len = len(result.get("content", ""))
logging.info(f"📊 审核结果统计:标题长度={title_len}字符,内容长度={content_len}字符")
return result
else:
# 记录当前尝试的结果状态
title_len = len(result.get("title", ""))
content_len = len(result.get("content", ""))
logging.warning(f"❌ 审核尝试 {current_attempt}/{max_retries+1} 失败judge_success={result.get('judge_success')},标题长度={title_len},内容长度={content_len}")
# 重试次数增加
retry_count += 1
if retry_count <= max_retries:
# 在重试前稍微等待,避免过快请求
delay = 1 + random.random() * 2 # 1-3秒随机延迟
remaining = max_retries - retry_count + 1
logging.info(f"⏱️ 等待{delay:.1f}秒后进行第{retry_count+1}次尝试,剩余{remaining}次尝试机会")
time.sleep(delay)
else:
logging.warning(f"⛔ 已达到最大重试次数,共尝试{current_attempt}次均未获取满意结果")
# 所有重试都失败,返回最后一次结果
logging.warning(f"⚠️ {max_retries+1}次尝试后仍未获取有效内容,将返回最后一次结果")
# 记录最后返回内容的基本信息
title_len = len(last_result.get("title", ""))
content_len = len(last_result.get("content", ""))
logging.info(f"📄 最终返回内容judge_success={last_result.get('judge_success')},标题长度={title_len}字符,内容长度={content_len}字符")
return last_result