Compare commits

..

2 Commits

Author SHA1 Message Date
8a59daae6c 修复了提示词中数据库调度问题 2025-07-25 12:05:31 +08:00
34d43fab29 修改了服务器的查询 2025-07-25 10:59:54 +08:00
10 changed files with 650 additions and 863 deletions

View File

@ -53,73 +53,40 @@ router = APIRouter(
)
def _resolve_ids_to_names_with_mapping(db_service: DatabaseService,
styleIds: Optional[List[int]] = None,
audienceIds: Optional[List[int]] = None,
scenicSpotIds: Optional[List[int]] = None,
productIds: Optional[List[int]] = None) -> tuple:
def _resolve_ids_to_objects(db_service: DatabaseService,
styleIds: Optional[List[int]] = None,
audienceIds: Optional[List[int]] = None,
scenicSpotIds: Optional[List[int]] = None,
productIds: Optional[List[int]] = None) -> tuple:
"""
将ID列表转换为名称列表并返回ID到名称的映射关系
Args:
db_service: 数据库服务
styleIds: 风格ID列表
audienceIds: 受众ID列表
scenicSpotIds: 景区ID列表
productIds: 产品ID列表
Returns:
(styles, audiences, scenic_spots, products, id_name_mappings) 名称列表和映射关系元组
将ID列表解析为完整的对象记录列表并返回ID到名称的映射
"""
styles = []
audiences = []
scenic_spots = []
products = []
styles, audiences, scenic_spots, products = [], [], [], []
# 建立ID到名称的映射字典
id_name_mappings = {
'style_mapping': {}, # {name: id}
'audience_mapping': {}, # {name: id}
'scenic_spot_mapping': {}, # {name: id}
'product_mapping': {} # {name: id}
'style_mapping': {}, 'audience_mapping': {},
'scenic_spot_mapping': {}, 'product_mapping': {}
}
# 如果数据库服务不可用,返回空列表
if not db_service or not db_service.is_available():
logger.warning("数据库服务不可用无法解析ID")
return styles, audiences, scenic_spots, products, id_name_mappings
# 解析风格ID
if styleIds:
style_records = db_service.get_styles_by_ids(styleIds)
for record in style_records:
style_name = record['styleName']
styles.append(style_name)
id_name_mappings['style_mapping'][style_name] = record['id']
styles = db_service.get_styles_by_ids(styleIds)
id_name_mappings['style_mapping'] = {record['styleName']: record['id'] for record in styles}
# 解析受众ID
if audienceIds:
audience_records = db_service.get_audiences_by_ids(audienceIds)
for record in audience_records:
audience_name = record['audienceName']
audiences.append(audience_name)
id_name_mappings['audience_mapping'][audience_name] = record['id']
audiences = db_service.get_audiences_by_ids(audienceIds)
id_name_mappings['audience_mapping'] = {record['audienceName']: record['id'] for record in audiences}
# 解析景区ID
if scenicSpotIds:
spot_records = db_service.get_scenic_spots_by_ids(scenicSpotIds)
for record in spot_records:
spot_name = record['name']
scenic_spots.append(spot_name)
id_name_mappings['scenic_spot_mapping'][spot_name] = record['id']
scenic_spots = db_service.get_scenic_spots_by_ids(scenicSpotIds)
id_name_mappings['scenic_spot_mapping'] = {record['name']: record['id'] for record in scenic_spots}
# 解析产品ID
if productIds:
product_records = db_service.get_products_by_ids(productIds)
for record in product_records:
product_name = record['productName'] # 修改这里从name改为productName
products.append(product_name)
id_name_mappings['product_mapping'][product_name] = record['id']
products = db_service.get_products_by_ids(productIds)
id_name_mappings['product_mapping'] = {record['productName']: record['id'] for record in products}
return styles, audiences, scenic_spots, products, id_name_mappings
@ -141,7 +108,7 @@ def _resolve_ids_to_names(db_service: DatabaseService,
Returns:
(styles, audiences, scenic_spots, products) 名称列表元组
"""
styles, audiences, scenic_spots, products, _ = _resolve_ids_to_names_with_mapping(
styles, audiences, scenic_spots, products, _ = _resolve_ids_to_objects(
db_service, styleIds, audienceIds, scenicSpotIds, productIds
)
return styles, audiences, scenic_spots, products
@ -213,8 +180,8 @@ async def generate_topics(
- **productIds**: 产品ID列表
"""
try:
# 将ID转换为名称,并获取映射关系
styles, audiences, scenic_spots, products, id_name_mappings = _resolve_ids_to_names_with_mapping(
# 将ID解析为完整的对象记录
styles, audiences, scenic_spots, products, id_name_mappings = _resolve_ids_to_objects(
db_service,
request.styleIds,
request.audienceIds,
@ -222,16 +189,26 @@ async def generate_topics(
request.productIds
)
# 从对象中提取名称列表,用于向后兼容
style_names = [s['styleName'] for s in styles]
audience_names = [a['audienceName'] for a in audiences]
scenic_spot_names = [s['name'] for s in scenic_spots]
product_names = [p['productName'] for p in products]
request_id, topics = await tweet_service.generate_topics(
dates=request.dates,
numTopics=request.numTopics,
styles=styles,
audiences=audiences,
scenic_spots=scenic_spots,
products=products
styles=style_names,
audiences=audience_names,
scenic_spots=scenic_spot_names,
products=product_names,
# 传递完整的对象以供下游使用
style_objects=styles,
audience_objects=audiences,
scenic_spot_objects=scenic_spots,
product_objects=products
)
# 为topics添加ID字段
enriched_topics = _add_ids_to_topics(topics, id_name_mappings)
return TopicResponse(
@ -260,8 +237,8 @@ async def generate_content(
- **autoJudge**: 是否自动进行内容审核
"""
try:
# 将ID转换为名称
styles, audiences, scenic_spots, products = _resolve_ids_to_names(
# 将ID解析为完整的对象记录
styles, audiences, scenic_spots, products, _ = _resolve_ids_to_objects(
db_service,
request.styleIds,
request.audienceIds,
@ -271,17 +248,14 @@ async def generate_content(
request_id, topic_index, content = await tweet_service.generate_content(
topic=request.topic,
styles=styles,
audiences=audiences,
scenic_spots=scenic_spots,
products=products,
style_objects=styles,
audience_objects=audiences,
scenic_spot_objects=scenic_spots,
product_objects=products,
autoJudge=request.autoJudge
)
# 提取judgeSuccess字段从content中移除以避免重复
judge_success = None
if isinstance(content, dict) and 'judgeSuccess' in content:
judge_success = content.pop('judgeSuccess')
judge_success = content.pop('judgeSuccess', None) if isinstance(content, dict) else None
return ContentResponse(
requestId=request_id,
@ -341,8 +315,8 @@ async def judge_content(
- **productIds**: 产品ID列表
"""
try:
# 将ID转换为名称
styles, audiences, scenic_spots, products = _resolve_ids_to_names(
# 将ID解析为完整的对象记录
styles, audiences, scenic_spots, products, _ = _resolve_ids_to_objects(
db_service,
request.styleIds,
request.audienceIds,
@ -353,10 +327,10 @@ async def judge_content(
request_id, topic_index, judged_content, judge_success = await tweet_service.judge_content(
topic=request.topic,
content=request.content,
styles=styles,
audiences=audiences,
scenic_spots=scenic_spots,
products=products
style_objects=styles,
audience_objects=audiences,
scenic_spot_objects=scenic_spots,
product_objects=products
)
return JudgeResponse(

File diff suppressed because it is too large Load Diff

View File

@ -57,53 +57,35 @@ class PromptBuilderService:
def build_content_prompt(self, topic: Dict[str, Any], step: str = "content") -> Tuple[str, str]:
"""
构建内容生成提示词
Args:
topic: 选题信息
step: 当前步骤用于过滤参考内容
Returns:
系统提示词和用户提示词的元组
构建内容生成提示词 (已重构)
此方法现在依赖于一个预先填充好完整信息的topic对象
"""
# 获取内容生成配置
content_config = self._ensure_content_config()
# 加载系统提示词和用户提示词模板
system_prompt_path = content_config.content_system_prompt
user_prompt_path = content_config.content_user_prompt
# 创建提示词模板
template = PromptTemplate(system_prompt_path, user_prompt_path)
# 获取风格内容
style_filename = topic.get("style", "")
style_content = self.prompt_service.get_style_content(style_filename)
# 获取目标受众内容
demand_filename = topic.get("targetAudience", "")
demand_content = self.prompt_service.get_audience_content(demand_filename)
# 获取景区信息
object_name = topic.get("object", "")
object_content = self.prompt_service.get_scenic_spot_info(object_name)
# 获取产品信息
product_name = topic.get("product", "")
product_content = self.prompt_service.get_product_info(product_name)
# 获取参考内容
template = PromptTemplate(content_config.content_system_prompt, content_config.content_user_prompt)
# 从预填充的topic对象中直接获取信息不再调用prompt_service
style_obj = topic.get('style_object', {})
style_content = f"{style_obj.get('styleName', '')}\n{style_obj.get('description', '')}"
audience_obj = topic.get('audience_object', {})
demand_content = f"{audience_obj.get('audienceName', '')}\n{audience_obj.get('description', '')}"
spot_obj = topic.get('scenic_spot_object', {})
object_content = f"{spot_obj.get('name', '')}\n{spot_obj.get('description', '')}"
product_obj = topic.get('product_object', {})
product_content = f"{product_obj.get('productName', '')}\n{product_obj.get('detailedDescription', '')}"
# 获取通用的参考内容
refer_content = self.prompt_service.get_refer_content(step)
# 构建系统提示词
system_prompt = template.get_system_prompt()
# 构建用户提示词
user_prompt = template.build_user_prompt(
style_content=f"{style_filename}\n{style_content}",
demand_content=f"{demand_filename}\n{demand_content}",
object_content=f"{object_name}\n{object_content}",
product_content=f"{product_name}\n{product_content}",
style_content=style_content,
demand_content=demand_content,
object_content=object_content,
product_content=product_content,
refer_content=refer_content
)
@ -229,7 +211,15 @@ class PromptBuilderService:
return system_prompt, user_prompt
def build_topic_prompt(self, products: Optional[List[str]] = None, scenic_spots: Optional[List[str]] = None, styles: Optional[List[str]] = None, audiences: Optional[List[str]] = None, dates: Optional[str] = None, numTopics: int = 5) -> Tuple[str, str]:
def build_topic_prompt(self, products: Optional[List[str]] = None,
scenic_spots: Optional[List[str]] = None,
styles: Optional[List[str]] = None,
audiences: Optional[List[str]] = None,
dates: Optional[str] = None, numTopics: int = 5,
style_objects: Optional[List[Dict[str, Any]]] = None,
audience_objects: Optional[List[Dict[str, Any]]] = None,
scenic_spot_objects: Optional[List[Dict[str, Any]]] = None,
product_objects: Optional[List[Dict[str, Any]]] = None) -> Tuple[str, str]:
"""
构建选题生成提示词
@ -253,59 +243,50 @@ class PromptBuilderService:
if not system_prompt_path or not user_prompt_path:
raise ValueError("选题提示词模板路径不完整")
# 创建提示词模板
template = PromptTemplate(system_prompt_path, user_prompt_path)
# 处理日期
if dates:
if ' to ' in dates:
start_date, end_date = dates.split(' to ')
month = f"{start_date}{end_date}"
elif ',' in dates:
month = ', '.join(dates.split(','))
else:
month = dates
else:
month = ''
# 获取风格内容
style_content = ''
if styles:
month = dates or ''
if dates and ' to ' in dates:
start_date, end_date = dates.split(' to ')
month = f"{start_date}{end_date}"
elif dates and ',' in dates:
month = ', '.join(dates.split(','))
# 使用传入的完整对象构建内容,避免重复查询
if style_objects:
style_content = '\n'.join([f"{obj['styleName']}: {obj.get('description', '')}" for obj in style_objects])
elif styles:
style_content = '\n'.join([f"{style}: {self.prompt_service.get_style_content(style)}" for style in styles])
else:
all_styles = self.prompt_service.get_all_styles()
style_content = "Style文件列表:\n" + "\n".join([f"- {style['name']}" for style in all_styles])
# 获取受众内容
demand_content = ''
if audiences:
if audience_objects:
demand_content = '\n'.join([f"{obj['audienceName']}: {obj.get('description', '')}" for obj in audience_objects])
elif audiences:
demand_content = '\n'.join([f"{audience}: {self.prompt_service.get_audience_content(audience)}" for audience in audiences])
else:
all_audiences = self.prompt_service.get_all_audiences()
demand_content = "Demand文件列表:\n" + "\n".join([f"- {audience['name']}" for audience in all_audiences])
# 获取参考内容
refer_content = self.prompt_service.get_refer_content("topic")
# 获取景区内容
object_content = ''
if scenic_spots:
if scenic_spot_objects:
object_content = '\n'.join([f"{obj['name']}: {obj.get('description', '')}" for obj in scenic_spot_objects])
elif scenic_spots:
object_content = '\n'.join([f"{spot}: {self.prompt_service.get_scenic_spot_info(spot)}" for spot in scenic_spots])
else:
all_spots = self.prompt_service.get_all_scenic_spots()
object_content = "Object信息:\n" + "\n".join([f"- {spot['name']}" for spot in all_spots])
# 获取产品内容
product_content = ''
if products:
if product_objects:
product_content = '\n'.join([f"{obj['productName']}: {obj.get('detailedDescription', '')}" for obj in product_objects])
elif products:
product_content = '\n'.join([f"{product}: {self.prompt_service.get_product_info(product)}" for product in products])
else:
product_content = '' # 假设没有默认产品列表
# 构建系统提示词
product_content = ''
refer_content = self.prompt_service.get_refer_content("topic")
system_prompt = template.get_system_prompt()
# 构建创作资料
creative_materials = (
f"你拥有的创作资料如下:\n"
f"风格信息:\n{style_content}\n\n"
@ -315,7 +296,6 @@ class PromptBuilderService:
f"产品信息:\n{product_content}"
)
# 构建用户提示词
user_prompt = template.build_user_prompt(
creative_materials=creative_materials,
numTopics=numTopics,
@ -326,44 +306,25 @@ class PromptBuilderService:
def build_judge_prompt(self, topic: Dict[str, Any], content: Dict[str, Any]) -> Tuple[str, str]:
"""
构建内容审核提示词
Args:
topic: 选题信息
content: 生成的内容
Returns:
系统提示词和用户提示词的元组
构建内容审核提示词 (已重构)
此方法现在依赖于一个预先填充好完整信息的topic对象
"""
# 获取内容生成配置
content_config = self._ensure_content_config()
# 从配置中获取审核提示词模板路径
system_prompt_path = content_config.judger_system_prompt
user_prompt_path = content_config.judger_user_prompt
# 创建提示词模板
template = PromptTemplate(system_prompt_path, user_prompt_path)
# 获取景区信息
object_name = topic.get("object", "")
object_content = self.prompt_service.get_scenic_spot_info(object_name)
# 获取产品信息
product_name = topic.get("product", "")
product_content = self.prompt_service.get_product_info(product_name)
# 获取参考内容
template = PromptTemplate(content_config.judger_system_prompt, content_config.judger_user_prompt)
# 从预填充的topic对象中直接获取信息
spot_obj = topic.get('scenic_spot_object', {})
object_content = f"{spot_obj.get('name', '')}\n{spot_obj.get('description', '')}"
product_obj = topic.get('product_object', {})
product_content = f"{product_obj.get('productName', '')}\n{product_obj.get('detailedDescription', '')}"
refer_content = self.prompt_service.get_refer_content("judge")
# 构建系统提示词
system_prompt = template.get_system_prompt()
# 格式化内容
import json
tweet_content = json.dumps(content, ensure_ascii=False, indent=4)
# 构建用户提示词
user_prompt = template.build_user_prompt(
tweet_content=tweet_content,
object_content=object_content,
@ -371,7 +332,7 @@ class PromptBuilderService:
refer_content=refer_content
)
return system_prompt, user_prompt
return system_prompt, user_prompt
def build_judge_prompt_with_params(self, topic: Dict[str, Any], content: Dict[str, Any],
styles: Optional[List[str]] = None,

View File

@ -137,19 +137,18 @@ class PromptService:
def get_style_content(self, styleName: str) -> str:
"""获取风格提示词内容"""
# 尝试从数据库获取
if self.db_pool:
try:
with self.db_pool.get_connection() as conn:
cursor = conn.cursor(dictionary=True)
cursor.execute(
"SELECT description FROM contentStyle WHERE styleName = %s",
(styleName,)
)
result = cursor.fetchone()
if result:
logger.info(f"从数据库获取风格提示词: {styleName}")
return result['description']
with conn.cursor(dictionary=True) as cursor:
cursor.execute(
"SELECT description FROM contentStyle WHERE styleName = %s",
(styleName,)
)
result = cursor.fetchone()
if result:
logger.info(f"从数据库获取风格提示词: {styleName}")
return result['description']
except Exception as e:
logger.error(f"从数据库获取风格提示词失败: {e}")
@ -177,19 +176,16 @@ class PromptService:
# 优先从数据库获取
if self.db_pool:
try:
conn = self.db_pool.get_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute(
"SELECT description FROM targetAudience WHERE audienceName = %s",
(audience_name,)
)
result = cursor.fetchone()
cursor.close()
conn.close()
if result:
logger.info(f"从数据库获取受众提示词: {audience_name}")
return result["description"]
with self.db_pool.get_connection() as conn:
with conn.cursor(dictionary=True) as cursor:
cursor.execute(
"SELECT description FROM targetAudience WHERE audienceName = %s",
(audience_name,)
)
result = cursor.fetchone()
if result:
logger.info(f"从数据库获取受众提示词: {audience_name}")
return result["description"]
except Exception as e:
logger.error(f"从数据库获取受众提示词失败: {e}")
@ -233,22 +229,18 @@ class PromptService:
Returns:
景区信息内容
"""
# 优先从数据库获取
if self.db_pool:
try:
conn = self.db_pool.get_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute(
"SELECT description FROM scenicSpot WHERE name = %s",
(spot_name,)
)
result = cursor.fetchone()
cursor.close()
conn.close()
if result:
logger.info(f"从数据库获取景区信息: {spot_name}")
return result["description"]
with self.db_pool.get_connection() as conn:
with conn.cursor(dictionary=True) as cursor:
cursor.execute(
"SELECT description FROM scenicSpot WHERE name = %s",
(spot_name,)
)
result = cursor.fetchone()
if result:
logger.info(f"从数据库获取景区信息: {spot_name}")
return result["description"]
except Exception as e:
logger.error(f"从数据库获取景区信息失败: {e}")
@ -280,22 +272,18 @@ class PromptService:
Returns:
产品信息内容
"""
# 优先从数据库获取
if self.db_pool:
try:
conn = self.db_pool.get_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute(
"SELECT detailedDescription FROM product WHERE productName = %s",
(product_name,)
)
result = cursor.fetchone()
cursor.close()
conn.close()
if result:
logger.info(f"从数据库获取产品信息: {product_name}")
return result["detailedDescription"]
with self.db_pool.get_connection() as conn:
with conn.cursor(dictionary=True) as cursor:
cursor.execute(
"SELECT detailedDescription FROM product WHERE productName = %s",
(product_name,)
)
result = cursor.fetchone()
if result:
logger.info(f"从数据库获取产品信息: {product_name}")
return result["detailedDescription"]
except Exception as e:
logger.error(f"从数据库获取产品信息失败: {e}")
@ -400,16 +388,13 @@ class PromptService:
# 优先从数据库获取
if self.db_pool:
try:
conn = self.db_pool.get_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT styleName as name, description FROM contentStyle")
results = cursor.fetchall()
cursor.close()
conn.close()
if results:
logger.info(f"从数据库获取所有风格: {len(results)}")
return results
with self.db_pool.get_connection() as conn:
with conn.cursor(dictionary=True) as cursor:
cursor.execute("SELECT styleName as name, description FROM contentStyle")
results = cursor.fetchall()
if results:
logger.info(f"从数据库获取所有风格: {len(results)}")
return results
except Exception as e:
logger.error(f"从数据库获取所有风格失败: {e}")
@ -447,16 +432,13 @@ class PromptService:
# 优先从数据库获取
if self.db_pool:
try:
conn = self.db_pool.get_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT audienceName as name, description FROM targetAudience")
results = cursor.fetchall()
cursor.close()
conn.close()
if results:
logger.info(f"从数据库获取所有受众: {len(results)}")
return results
with self.db_pool.get_connection() as conn:
with conn.cursor(dictionary=True) as cursor:
cursor.execute("SELECT audienceName as name, description FROM targetAudience")
results = cursor.fetchall()
if results:
logger.info(f"从数据库获取所有受众: {len(results)}")
return results
except Exception as e:
logger.error(f"从数据库获取所有受众失败: {e}")
@ -494,19 +476,16 @@ class PromptService:
# 优先从数据库获取
if self.db_pool:
try:
conn = self.db_pool.get_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT name as name, description FROM scenicSpot")
results = cursor.fetchall()
cursor.close()
conn.close()
if results:
logger.info(f"从数据库获取所有景区: {len(results)}")
return [{
"name": item["name"],
"description": item["description"][:100] + "..." if len(item["description"]) > 100 else item["description"]
} for item in results]
with self.db_pool.get_connection() as conn:
with conn.cursor(dictionary=True) as cursor:
cursor.execute("SELECT name as name, description FROM scenicSpot")
results = cursor.fetchall()
if results:
logger.info(f"从数据库获取所有景区: {len(results)}")
return [{
"name": item["name"],
"description": item["description"][:100] + "..." if len(item["description"]) > 100 else item["description"]
} for item in results]
except Exception as e:
logger.error(f"从数据库获取所有景区失败: {e}")
@ -543,34 +522,32 @@ class PromptService:
# 优先保存到数据库
if self.db_pool:
try:
conn = self.db_pool.get_connection()
cursor = conn.cursor()
# 检查是否存在
cursor.execute(
"SELECT COUNT(*) FROM contentStyle WHERE styleName = %s",
(name,)
)
count = cursor.fetchone()[0]
if count > 0:
# 更新
cursor.execute(
"UPDATE contentStyle SET description = %s WHERE styleName = %s",
(description, name)
)
else:
# 插入
cursor.execute(
"INSERT INTO contentStyle (styleName, description) VALUES (%s, %s)",
(name, description)
)
conn.commit()
cursor.close()
conn.close()
logger.info(f"风格保存到数据库成功: {name}")
return True
with self.db_pool.get_connection() as conn:
with conn.cursor() as cursor:
# 检查是否存在
cursor.execute(
"SELECT COUNT(*) FROM contentStyle WHERE styleName = %s",
(name,)
)
count = cursor.fetchone()[0]
if count > 0:
# 更新
cursor.execute(
"UPDATE contentStyle SET description = %s WHERE styleName = %s",
(description, name)
)
else:
# 插入
cursor.execute(
"INSERT INTO contentStyle (styleName, description) VALUES (%s, %s)",
(name, description)
)
conn.commit()
logger.info(f"风格保存到数据库成功: {name}")
return True
except Exception as e:
logger.error(f"风格保存到数据库失败: {e}")
@ -609,34 +586,32 @@ class PromptService:
# 优先保存到数据库
if self.db_pool:
try:
conn = self.db_pool.get_connection()
cursor = conn.cursor()
# 检查是否存在
cursor.execute(
"SELECT COUNT(*) FROM targetAudience WHERE audienceName = %s",
(name,)
)
count = cursor.fetchone()[0]
if count > 0:
# 更新
cursor.execute(
"UPDATE targetAudience SET description = %s WHERE audienceName = %s",
(description, name)
)
else:
# 插入
cursor.execute(
"INSERT INTO targetAudience (audienceName, description) VALUES (%s, %s)",
(name, description)
)
conn.commit()
cursor.close()
conn.close()
logger.info(f"受众保存到数据库成功: {name}")
return True
with self.db_pool.get_connection() as conn:
with conn.cursor() as cursor:
# 检查是否存在
cursor.execute(
"SELECT COUNT(*) FROM targetAudience WHERE audienceName = %s",
(name,)
)
count = cursor.fetchone()[0]
if count > 0:
# 更新
cursor.execute(
"UPDATE targetAudience SET description = %s WHERE audienceName = %s",
(description, name)
)
else:
# 插入
cursor.execute(
"INSERT INTO targetAudience (audienceName, description) VALUES (%s, %s)",
(name, description)
)
conn.commit()
logger.info(f"受众保存到数据库成功: {name}")
return True
except Exception as e:
logger.error(f"受众保存到数据库失败: {e}")

View File

@ -48,11 +48,15 @@ class TweetService:
self.prompt_service = PromptService(config_manager)
self.prompt_builder = PromptBuilderService(config_manager, self.prompt_service)
async def generate_topics(self, dates: Optional[str] = None, numTopics: int = 5,
styles: Optional[List[str]] = None,
async def generate_topics(self, dates: Optional[str] = None, numTopics: int = 5,
styles: Optional[List[str]] = None,
audiences: Optional[List[str]] = None,
scenic_spots: Optional[List[str]] = None,
products: Optional[List[str]] = None) -> Tuple[str, List[Dict[str, Any]]]:
products: Optional[List[str]] = None,
style_objects: Optional[List[Dict[str, Any]]] = None,
audience_objects: Optional[List[Dict[str, Any]]] = None,
scenic_spot_objects: Optional[List[Dict[str, Any]]] = None,
product_objects: Optional[List[Dict[str, Any]]] = None) -> Tuple[str, List[Dict[str, Any]]]:
"""
生成选题
@ -63,6 +67,10 @@ class TweetService:
audiences: 受众列表
scenic_spots: 景区列表
products: 产品列表
style_objects: 风格对象列表
audience_objects: 受众对象列表
scenic_spot_objects: 景区对象列表
product_objects: 产品对象列表
Returns:
请求ID和生成的选题列表
@ -82,7 +90,11 @@ class TweetService:
styles=styles,
audiences=audiences,
dates=dates,
numTopics=numTopics
numTopics=numTopics,
style_objects=style_objects,
audience_objects=audience_objects,
scenic_spot_objects=scenic_spot_objects,
product_objects=product_objects
)
# 使用预构建的提示词生成选题
@ -97,11 +109,11 @@ class TweetService:
logger.info(f"选题生成完成请求ID: {requestId}, 数量: {len(topics)}")
return requestId, topics
async def generate_content(self, topic: Optional[Dict[str, Any]] = None,
styles: Optional[List[str]] = None,
audiences: Optional[List[str]] = None,
scenic_spots: Optional[List[str]] = None,
products: Optional[List[str]] = None,
async def generate_content(self, topic: Optional[Dict[str, Any]] = None,
style_objects: Optional[List[Dict[str, Any]]] = None,
audience_objects: Optional[List[Dict[str, Any]]] = None,
scenic_spot_objects: Optional[List[Dict[str, Any]]] = None,
product_objects: Optional[List[Dict[str, Any]]] = None,
autoJudge: bool = False) -> Tuple[str, str, Dict[str, Any]]:
"""
为选题生成内容
@ -117,25 +129,28 @@ class TweetService:
Returns:
请求ID选题索引和生成的内容包含judgeSuccess状态
"""
# 如果没有提供topic创建一个基础的topic
if not topic:
topic = {"index": "1", "date": "2024-07-01"}
topicIndex = topic.get('index', 'N/A')
logger.info(f"开始为选题 {topicIndex} 生成内容{'(含审核)' if autoJudge else ''}")
# 创建topic的副本并应用覆盖参数
# 核心修改创建一个增强版的topic将所有需要的信息预先填充好
enhanced_topic = topic.copy()
if styles and len(styles) > 0:
enhanced_topic['style'] = styles[0] # 使用第一个风格
if audiences and len(audiences) > 0:
enhanced_topic['targetAudience'] = audiences[0] # 使用第一个受众
if scenic_spots and len(scenic_spots) > 0:
enhanced_topic['object'] = scenic_spots[0] # 使用第一个景区
if products and len(products) > 0:
enhanced_topic['product'] = products[0] # 使用第一个产品
# 使用PromptBuilderService构建提示词
if style_objects:
enhanced_topic['style_object'] = style_objects[0]
enhanced_topic['style'] = style_objects[0].get('styleName')
if audience_objects:
enhanced_topic['audience_object'] = audience_objects[0]
enhanced_topic['targetAudience'] = audience_objects[0].get('audienceName')
if scenic_spot_objects:
enhanced_topic['scenic_spot_object'] = scenic_spot_objects[0]
enhanced_topic['object'] = scenic_spot_objects[0].get('name')
if product_objects:
enhanced_topic['product_object'] = product_objects[0]
enhanced_topic['product'] = product_objects[0].get('productName')
# 使用PromptBuilderService构建提示词现在它只需要enhanced_topic
system_prompt, user_prompt = self.prompt_builder.build_content_prompt(enhanced_topic, "content")
# 使用预构建的提示词生成内容
@ -207,61 +222,42 @@ class TweetService:
return requestId, topicIndex, content
async def judge_content(self, topic: Optional[Dict[str, Any]] = None, content: Dict[str, Any] = {},
styles: Optional[List[str]] = None,
audiences: Optional[List[str]] = None,
scenic_spots: Optional[List[str]] = None,
products: Optional[List[str]] = None) -> Tuple[str, str, Dict[str, Any], bool]:
style_objects: Optional[List[Dict[str, Any]]] = None,
audience_objects: Optional[List[Dict[str, Any]]] = None,
scenic_spot_objects: Optional[List[Dict[str, Any]]] = None,
product_objects: Optional[List[Dict[str, Any]]] = None) -> Tuple[str, str, Dict[str, Any], bool]:
"""
审核内容
Args:
topic: 选题信息
content: 要审核的内容
styles: 风格列表
audiences: 受众列表
scenic_spots: 景区列表
products: 产品列表
Returns:
请求ID选题索引审核后的内容和审核是否成功
审核内容 (已重构)
"""
# 如果没有提供topic创建一个基础的topic
if not topic:
topic = {"index": "1", "date": "2024-07-01"}
# 如果没有提供content返回错误
if not content:
content = {"title": "未提供内容", "content": "未提供内容"}
topicIndex = topic.get('index', 'unknown')
logger.info(f"开始审核选题 {topicIndex} 的内容")
# 创建topic的副本并应用覆盖参数
# 构建包含所有预取信息的enhanced_topic
enhanced_topic = topic.copy()
if styles and len(styles) > 0:
enhanced_topic['style'] = styles[0] # 使用第一个风格
if audiences and len(audiences) > 0:
enhanced_topic['targetAudience'] = audiences[0] # 使用第一个受众
if scenic_spots and len(scenic_spots) > 0:
enhanced_topic['object'] = scenic_spots[0] # 使用第一个景区
if products and len(products) > 0:
enhanced_topic['product'] = products[0] # 使用第一个产品
# 使用PromptBuilderService构建提示词
if style_objects:
enhanced_topic['style_object'] = style_objects[0]
if audience_objects:
enhanced_topic['audience_object'] = audience_objects[0]
if scenic_spot_objects:
enhanced_topic['scenic_spot_object'] = scenic_spot_objects[0]
if product_objects:
enhanced_topic['product_object'] = product_objects[0]
system_prompt, user_prompt = self.prompt_builder.build_judge_prompt(enhanced_topic, content)
# 使用预构建的提示词进行审核
judged_data = await self.content_judger.judge_content_with_prompt(content, enhanced_topic, system_prompt, user_prompt)
# 提取审核是否成功content_judger返回judge_success字段
judgeSuccess = judged_data.get('judge_success', False)
# 统一字段命名移除judge_success添加judgeSuccess
if 'judge_success' in judged_data:
judged_data = {k: v for k, v in judged_data.items() if k != 'judge_success'}
judged_data['judgeSuccess'] = judgeSuccess
# 生成请求ID
requestId = f"judge-{datetime.now().strftime('%Y%m%d-%H%M%S')}-{str(uuid.uuid4())[:8]}"
logger.info(f"内容审核完成请求ID: {requestId}, 选题索引: {topicIndex}, 审核结果: {judgeSuccess}")