754 lines
35 KiB
Python
Raw Normal View History

2025-07-10 17:51:37 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
文字内容服务层
封装现有功能提供API调用
"""
import logging
import uuid
from typing import List, Dict, Any, Optional, Tuple
from datetime import datetime
2025-08-04 13:43:52 +08:00
import re
from difflib import SequenceMatcher
2025-07-10 17:51:37 +08:00
from core.config import ConfigManager, GenerateTopicConfig, GenerateContentConfig
from core.ai import AIAgent
from utils.file_io import OutputManager
from tweet.topic_generator import TopicGenerator
from tweet.content_generator import ContentGenerator
from tweet.content_judger import ContentJudger
from api.services.prompt_builder import PromptBuilderService
from api.services.prompt_service import PromptService
2025-07-29 20:37:43 +08:00
from api.services.database_service import DatabaseService
2025-07-10 17:51:37 +08:00
logger = logging.getLogger(__name__)
2025-08-04 13:43:52 +08:00
class TopicIDMappingManager:
"""选题ID映射管理器 - 专门处理选题中的对象ID映射"""
def __init__(self):
"""初始化映射管理器"""
self.name_to_id = {} # 名称 -> ID映射
self.mapping_data = {
'styles': {},
'audiences': {},
'scenic_spots': {},
'products': {}
}
def add_objects_mapping(self,
style_objects: Optional[List[Dict[str, Any]]] = None,
audience_objects: Optional[List[Dict[str, Any]]] = None,
scenic_spot_objects: Optional[List[Dict[str, Any]]] = None,
product_objects: Optional[List[Dict[str, Any]]] = None):
"""
批量添加对象映射关系
Args:
style_objects: 风格对象列表
audience_objects: 受众对象列表
scenic_spot_objects: 景区对象列表
product_objects: 产品对象列表
"""
logger.info("开始建立选题ID映射关系")
mapping_count = 0
if style_objects:
logger.info(f"处理 {len(style_objects)} 个风格对象")
for obj in style_objects:
name = obj.get('styleName', '')
obj_id = obj.get('id')
if name and obj_id:
self.mapping_data['styles'][name] = str(obj_id)
self._add_name_variants('styles', name, str(obj_id))
mapping_count += 1
logger.info(f"添加风格映射: {name} -> ID {obj_id}")
if audience_objects:
logger.info(f"处理 {len(audience_objects)} 个受众对象")
for obj in audience_objects:
name = obj.get('audienceName', '')
obj_id = obj.get('id')
if name and obj_id:
self.mapping_data['audiences'][name] = str(obj_id)
self._add_name_variants('audiences', name, str(obj_id))
mapping_count += 1
logger.info(f"添加受众映射: {name} -> ID {obj_id}")
if scenic_spot_objects:
logger.info(f"处理 {len(scenic_spot_objects)} 个景区对象")
for obj in scenic_spot_objects:
name = obj.get('name', '')
obj_id = obj.get('id')
if name and obj_id:
self.mapping_data['scenic_spots'][name] = str(obj_id)
self._add_name_variants('scenic_spots', name, str(obj_id))
mapping_count += 1
logger.info(f"添加景区映射: {name} -> ID {obj_id}")
if product_objects:
logger.info(f"处理 {len(product_objects)} 个产品对象")
for obj in product_objects:
name = obj.get('productName', '')
obj_id = obj.get('id')
if name and obj_id:
self.mapping_data['products'][name] = str(obj_id)
self._add_name_variants('products', name, str(obj_id))
mapping_count += 1
logger.info(f"添加产品映射: {name} -> ID {obj_id}")
total_variants = len(self.name_to_id)
logger.info(f"ID映射关系建立完成: {mapping_count} 个对象,生成 {total_variants} 个名称变体")
def _add_name_variants(self, category: str, name: str, obj_id: str):
"""为名称添加各种变体以支持模糊匹配"""
variants = [name]
# 去除标点符号版本
clean_name = re.sub(r'[^\w\s]', '', name)
if clean_name != name:
variants.append(clean_name)
# 去除空格版本
no_space_name = name.replace(' ', '')
if no_space_name != name:
variants.append(no_space_name)
# 添加简化版本(去除常见后缀)
simplified = re.sub(r'(风格|类型|系列|产品|景区|公园)$', '', name)
if simplified != name and simplified:
variants.append(simplified)
# 为所有变体添加映射
for variant in variants:
self.name_to_id[variant.lower()] = {
'category': category,
'id': obj_id,
'original_name': name
}
# 记录生成的变体(仅在有多个变体时记录)
if len(variants) > 1:
variants_str = ', '.join(f"'{v}'" for v in variants)
logger.info(f"'{name}' 生成 {len(variants)} 个变体: {variants_str}")
def find_ids_in_topic(self, topic: Dict[str, Any]) -> Dict[str, List[str]]:
"""
在选题中查找相关的对象ID
Args:
topic: 选题字典
Returns:
按类型分组的ID列表
"""
topic_index = topic.get('index', 'N/A')
found_ids = {
'style_ids': [],
'audience_ids': [],
'scenic_spot_ids': [],
'product_ids': []
}
# 提取选题中的所有文本
topic_text = self._extract_topic_text(topic)
topic_text_lower = topic_text.lower()
logger.info(f"开始为选题 {topic_index} 进行ID匹配")
logger.info(f"选题 {topic_index} 提取的文本内容: '{topic_text}'")
logger.info(f"当前映射库包含 {len(self.name_to_id)} 个名称变体")
# 显示映射库内容
logger.info("映射库内容:")
for variant, info in self.name_to_id.items():
logger.info(f" '{variant}' -> {info['original_name']} ({info['category']}, ID: {info['id']})")
# 记录匹配过程
match_details = []
# 在文本中查找匹配的名称
for name_variant, mapping_info in self.name_to_id.items():
if name_variant in topic_text_lower:
category = mapping_info['category']
obj_id = mapping_info['id']
original_name = mapping_info['original_name']
# 记录匹配详情
match_details.append({
'variant': name_variant,
'original_name': original_name,
'category': category,
'id': obj_id
})
# 映射到返回格式
if category == 'styles' and obj_id not in found_ids['style_ids']:
found_ids['style_ids'].append(obj_id)
logger.info(f"选题 {topic_index} 匹配到风格: '{name_variant}' -> {original_name} (ID: {obj_id})")
elif category == 'audiences' and obj_id not in found_ids['audience_ids']:
found_ids['audience_ids'].append(obj_id)
logger.info(f"选题 {topic_index} 匹配到受众: '{name_variant}' -> {original_name} (ID: {obj_id})")
elif category == 'scenic_spots' and obj_id not in found_ids['scenic_spot_ids']:
found_ids['scenic_spot_ids'].append(obj_id)
logger.info(f"选题 {topic_index} 匹配到景区: '{name_variant}' -> {original_name} (ID: {obj_id})")
elif category == 'products' and obj_id not in found_ids['product_ids']:
found_ids['product_ids'].append(obj_id)
logger.info(f"选题 {topic_index} 匹配到产品: '{name_variant}' -> {original_name} (ID: {obj_id})")
# 统计匹配结果
total_found = sum(len(ids) for ids in found_ids.values())
total_available = len(set(info['category'] + '_' + info['id'] for info in self.name_to_id.values()))
match_rate = (total_found / total_available * 100) if total_available > 0 else 0
# 输出详细的匹配结果
if total_found > 0:
logger.info(f"选题 {topic_index} ID匹配完成: 找到 {total_found} 个相关对象,匹配率: {match_rate:.1f}%")
logger.info(f"选题 {topic_index} 匹配详情: {match_details}")
# 按类别显示匹配结果
for category, ids in found_ids.items():
if ids:
category_name = {
'style_ids': '风格',
'audience_ids': '受众',
'scenic_spot_ids': '景区',
'product_ids': '产品'
}.get(category, category)
# 获取匹配的原始名称
matched_names = []
# 建立正确的分类映射关系
category_mapping = {
'style_ids': 'styles',
'audience_ids': 'audiences',
'scenic_spot_ids': 'scenic_spots',
'product_ids': 'products'
}
target_category = category_mapping.get(category)
logger.info(f"选题 {topic_index} 处理分类 {category} -> {target_category}, IDs: {ids}")
for detail in match_details:
if detail['id'] in ids and detail['category'] == target_category:
matched_names.append(f"{detail['original_name']}({detail['id']})")
# 显示分类结果
names_str = ', '.join(matched_names) if matched_names else '(无匹配名称)'
logger.info(f" {category_name}: {names_str}")
# 如果没有匹配名称,输出调试信息
if not matched_names:
logger.info(f"选题 {topic_index} {category_name}无匹配名称,详情检查:")
logger.info(f" IDs: {ids}")
logger.info(f" 目标分类: {target_category}")
for detail in match_details:
if detail['id'] in ids:
logger.info(f" 详情: {detail}")
else:
logger.info(f"选题 {topic_index} 未匹配到任何相关对象ID")
return found_ids
def _extract_topic_text(self, topic: Dict[str, Any]) -> str:
"""提取选题中的所有文本内容"""
text_parts = []
# 提取主要字段的文本
text_fields = ['object', 'style', 'targetAudience', 'product', 'logic',
'productLogic', 'styleLogic', 'targetAudienceLogic']
for field in text_fields:
if field in topic and topic[field]:
text_parts.append(str(topic[field]))
return ' '.join(text_parts)
2025-07-10 17:51:37 +08:00
class TweetService:
"""文字内容服务类"""
def __init__(self, ai_agent: AIAgent, config_manager: ConfigManager, output_manager: OutputManager):
"""
初始化文字内容服务
Args:
ai_agent: AI代理
config_manager: 配置管理器
output_manager: 输出管理器
"""
self.ai_agent = ai_agent
self.config_manager = config_manager
self.output_manager = output_manager
# 初始化各个组件
self.topic_generator = TopicGenerator(ai_agent, config_manager, output_manager)
self.content_generator = ContentGenerator(ai_agent, config_manager, output_manager)
self.content_judger = ContentJudger(ai_agent, config_manager, output_manager)
# 初始化提示词服务和构建器
self.prompt_service = PromptService(config_manager)
self.prompt_builder = PromptBuilderService(config_manager, self.prompt_service)
2025-08-04 13:43:52 +08:00
# 初始化选题ID映射管理器
self.topic_id_mapping_manager = TopicIDMappingManager()
async def generate_topics(self, dates: Optional[str] = None, numTopics: int = 5,
styles: Optional[List[str]] = None,
2025-07-11 17:39:51 +08:00
audiences: Optional[List[str]] = None,
scenic_spots: Optional[List[str]] = None,
products: Optional[List[str]] = None,
style_objects: Optional[List[Dict[str, Any]]] = None,
audience_objects: Optional[List[Dict[str, Any]]] = None,
scenic_spot_objects: Optional[List[Dict[str, Any]]] = None,
product_objects: Optional[List[Dict[str, Any]]] = None) -> Tuple[str, List[Dict[str, Any]]]:
2025-07-10 17:51:37 +08:00
"""
生成选题
Args:
2025-07-11 17:39:51 +08:00
dates: 日期字符串可能为单个日期多个日期用逗号分隔或范围
2025-07-15 10:59:36 +08:00
numTopics: 要生成的选题数量
2025-07-11 17:39:51 +08:00
styles: 风格列表
audiences: 受众列表
scenic_spots: 景区列表
products: 产品列表
style_objects: 风格对象列表
audience_objects: 受众对象列表
scenic_spot_objects: 景区对象列表
product_objects: 产品对象列表
2025-07-10 17:51:37 +08:00
Returns:
请求ID和生成的选题列表
"""
2025-07-15 10:59:36 +08:00
logger.info(f"开始生成选题,日期: {dates}, 数量: {numTopics}")
2025-07-10 17:51:37 +08:00
# 获取并更新配置
topic_config = self.config_manager.get_config('topic_gen', GenerateTopicConfig)
2025-07-11 17:39:51 +08:00
if dates:
topic_config.topic.date = dates
2025-07-15 10:59:36 +08:00
topic_config.topic.num = numTopics
2025-07-10 17:51:37 +08:00
2025-08-04 13:43:52 +08:00
# 建立ID映射关系用于后续的ID反向映射
self.topic_id_mapping_manager.add_objects_mapping(
style_objects=style_objects,
audience_objects=audience_objects,
scenic_spot_objects=scenic_spot_objects,
product_objects=product_objects
)
# 使用PromptBuilderService构建提示词
system_prompt, user_prompt = self.prompt_builder.build_topic_prompt(
2025-07-11 17:39:51 +08:00
products=products,
scenic_spots=scenic_spots,
styles=styles,
audiences=audiences,
dates=dates,
numTopics=numTopics,
style_objects=style_objects,
audience_objects=audience_objects,
scenic_spot_objects=scenic_spot_objects,
product_objects=product_objects
)
# 使用预构建的提示词生成选题
topics = await self.topic_generator.generate_topics_with_prompt(system_prompt, user_prompt)
2025-07-10 17:51:37 +08:00
if not topics:
logger.error("未能生成任何选题")
return str(uuid.uuid4()), []
2025-08-04 13:43:52 +08:00
# 为每个选题添加相关的对象ID
logger.info(f"开始为 {len(topics)} 个选题进行ID映射分析")
enhanced_topics = []
total_mapped_topics = 0
for topic in topics:
enhanced_topic = topic.copy()
# 查找选题中涉及的对象ID
related_object_ids = self.topic_id_mapping_manager.find_ids_in_topic(topic)
# 只有当找到相关ID时才添加字段
if any(related_object_ids.values()):
enhanced_topic['related_object_ids'] = related_object_ids
total_mapped_topics += 1
logger.info(f"选题 {topic.get('index', 'N/A')} 找到相关ID: {related_object_ids}")
enhanced_topics.append(enhanced_topic)
# 统计总体映射情况
mapping_coverage = (total_mapped_topics / len(topics) * 100) if topics else 0
logger.info(f"选题ID映射完成: {total_mapped_topics}/{len(topics)} 个选题包含相关对象ID覆盖率: {mapping_coverage:.1f}%")
2025-07-10 17:51:37 +08:00
# 生成请求ID
2025-07-15 10:59:36 +08:00
requestId = f"topic-{datetime.now().strftime('%Y%m%d-%H%M%S')}-{str(uuid.uuid4())[:8]}"
2025-07-10 17:51:37 +08:00
2025-08-04 13:43:52 +08:00
logger.info(f"选题生成完成请求ID: {requestId}, 数量: {len(enhanced_topics)}")
return requestId, enhanced_topics
2025-07-10 17:51:37 +08:00
2025-07-29 20:37:43 +08:00
async def generate_content(self, topic: Optional[Dict[str, Any]] = None, autoJudge: bool = False,
style_objects: Optional[List[Dict[str, Any]]] = None,
audience_objects: Optional[List[Dict[str, Any]]] = None,
scenic_spot_objects: Optional[List[Dict[str, Any]]] = None,
product_objects: Optional[List[Dict[str, Any]]] = None) -> Tuple[str, str, Dict[str, Any]]:
2025-07-10 17:51:37 +08:00
"""
2025-07-29 20:37:43 +08:00
为单个选题生成内容
2025-07-10 17:51:37 +08:00
Args:
2025-07-29 20:37:43 +08:00
topic: 选题信息可能包含ID字段
autoJudge: 是否进行内嵌审核
style_objects: 风格对象列表可选用于兼容
audience_objects: 受众对象列表可选用于兼容
scenic_spot_objects: 景区对象列表可选用于兼容
product_objects: 产品对象列表可选用于兼容
2025-07-10 17:51:37 +08:00
Returns:
请求ID选题索引和生成的内容包含judgeSuccess状态
2025-07-10 17:51:37 +08:00
"""
if not topic:
topic = {"index": "1", "date": "2024-07-01"}
2025-07-15 17:18:46 +08:00
topicIndex = topic.get('index', 'N/A')
2025-07-15 10:59:36 +08:00
logger.info(f"开始为选题 {topicIndex} 生成内容{'(含审核)' if autoJudge else ''}")
2025-07-29 20:37:43 +08:00
# 增强版的topic处理优先使用ID获取最新数据
enhanced_topic = await self._enhance_topic_with_database_data(topic)
# 如果没有通过ID获取到数据使用传入的对象参数作为兜底
if style_objects and not enhanced_topic.get('style_object'):
enhanced_topic['style_object'] = style_objects[0]
enhanced_topic['style'] = style_objects[0].get('styleName')
2025-07-29 20:37:43 +08:00
if audience_objects and not enhanced_topic.get('audience_object'):
enhanced_topic['audience_object'] = audience_objects[0]
enhanced_topic['targetAudience'] = audience_objects[0].get('audienceName')
2025-07-29 20:37:43 +08:00
if scenic_spot_objects and not enhanced_topic.get('scenic_spot_object'):
enhanced_topic['scenic_spot_object'] = scenic_spot_objects[0]
enhanced_topic['object'] = scenic_spot_objects[0].get('name')
2025-07-29 20:37:43 +08:00
if product_objects and not enhanced_topic.get('product_object'):
enhanced_topic['product_object'] = product_objects[0]
enhanced_topic['product'] = product_objects[0].get('productName')
2025-07-29 20:37:43 +08:00
# 使用PromptBuilderService构建提示词
system_prompt, user_prompt = self.prompt_builder.build_content_prompt(enhanced_topic, "content")
# 使用预构建的提示词生成内容
content = await self.content_generator.generate_content_with_prompt(enhanced_topic, system_prompt, user_prompt)
2025-07-15 10:59:36 +08:00
if not content:
logger.error(f"未能为选题 {topicIndex} 生成内容")
return str(uuid.uuid4()), topicIndex, {}
2025-07-15 10:59:36 +08:00
# 如果启用自动审核,进行内嵌审核
if autoJudge:
try:
2025-07-15 10:59:36 +08:00
logger.info(f"开始对选题 {topicIndex} 的内容进行内嵌审核")
# 使用PromptBuilderService构建审核提示词
judge_system_prompt, judge_user_prompt = self.prompt_builder.build_judge_prompt(enhanced_topic, content)
2025-07-15 10:59:36 +08:00
# 进行内容审核
judged_content = await self.content_judger.judge_content_with_prompt(content, enhanced_topic, judge_system_prompt, judge_user_prompt)
# 统一输出格式始终包含judgeSuccess状态
# content_judger返回的是judge_success字段下划线命名
judge_success = judged_content.get('judge_success', False)
if judge_success:
logger.info(f"选题 {topicIndex} 内容审核成功")
# 审核成功使用审核后的内容但移除judge_success添加统一的judgeSuccess
content = {k: v for k, v in judged_content.items() if k != 'judge_success'}
content['judgeSuccess'] = True
else:
2025-07-29 20:37:43 +08:00
logger.warning(f"选题 {topicIndex} 内容审核未通过")
# 审核失败使用原始内容添加judgeSuccess状态
2025-07-15 10:59:36 +08:00
content['judgeSuccess'] = False
except Exception as e:
2025-07-29 20:37:43 +08:00
logger.error(f"选题 {topicIndex} 内容审核过程中发生错误: {e}", exc_info=True)
# 审核出错:使用原始内容,标记审核失败
2025-07-15 10:59:36 +08:00
content['judgeSuccess'] = False
2025-07-10 17:51:37 +08:00
# 生成请求ID
2025-07-15 10:59:36 +08:00
requestId = f"content-{datetime.now().strftime('%Y%m%d-%H%M%S')}-{str(uuid.uuid4())[:8]}"
2025-07-10 17:51:37 +08:00
2025-07-29 20:37:43 +08:00
logger.info(f"选题 {topicIndex} 内容生成完成请求ID: {requestId}")
2025-07-15 10:59:36 +08:00
return requestId, topicIndex, content
2025-07-29 20:37:43 +08:00
async def _enhance_topic_with_database_data(self, topic: Dict[str, Any]) -> Dict[str, Any]:
"""
2025-08-04 13:43:52 +08:00
使用数据库数据增强选题信息优先使用related_object_ids中的反射ID
2025-07-29 20:37:43 +08:00
Args:
2025-08-04 13:43:52 +08:00
topic: 原始选题数据可能包含related_object_ids字段
2025-07-29 20:37:43 +08:00
Returns:
增强后的选题数据
"""
enhanced_topic = topic.copy()
try:
# 通过数据库服务获取详细信息
db_service = DatabaseService(self.config_manager)
if not db_service.is_available():
logger.warning("数据库服务不可用,无法增强选题数据")
return enhanced_topic
2025-08-04 13:43:52 +08:00
# 优先使用related_object_ids中的反射ID更精确
related_ids = topic.get('related_object_ids', {})
if related_ids:
logger.info(f"选题包含反射ID信息优先使用: {related_ids}")
enhanced_topic = await self._enhance_with_related_ids(enhanced_topic, db_service, related_ids)
return enhanced_topic
2025-07-29 20:37:43 +08:00
# 处理风格ID
if 'styleIds' in topic and topic['styleIds']:
style_id = topic['styleIds'][0] if isinstance(topic['styleIds'], list) else topic['styleIds']
style_data = db_service.get_style_by_id(style_id)
if style_data:
2025-08-04 13:43:52 +08:00
style_name = style_data.get('styleName')
2025-07-29 20:37:43 +08:00
enhanced_topic['style_object'] = style_data
2025-08-04 13:43:52 +08:00
enhanced_topic['style'] = style_name
logger.info(f"从数据库加载风格数据: {style_name} (ID: {style_id})")
2025-07-29 20:37:43 +08:00
# 处理受众ID
if 'audienceIds' in topic and topic['audienceIds']:
audience_id = topic['audienceIds'][0] if isinstance(topic['audienceIds'], list) else topic['audienceIds']
audience_data = db_service.get_audience_by_id(audience_id)
if audience_data:
2025-08-04 13:43:52 +08:00
audience_name = audience_data.get('audienceName')
2025-07-29 20:37:43 +08:00
enhanced_topic['audience_object'] = audience_data
2025-08-04 13:43:52 +08:00
enhanced_topic['targetAudience'] = audience_name
logger.info(f"从数据库加载受众数据: {audience_name} (ID: {audience_id})")
2025-07-29 20:37:43 +08:00
# 处理景区ID
if 'scenicSpotIds' in topic and topic['scenicSpotIds']:
spot_id = topic['scenicSpotIds'][0] if isinstance(topic['scenicSpotIds'], list) else topic['scenicSpotIds']
spot_data = db_service.get_scenic_spot_by_id(spot_id)
if spot_data:
2025-08-04 13:43:52 +08:00
spot_name = spot_data.get('name')
2025-07-29 20:37:43 +08:00
enhanced_topic['scenic_spot_object'] = spot_data
2025-08-04 13:43:52 +08:00
enhanced_topic['object'] = spot_name
logger.info(f"从数据库加载景区数据: {spot_name} (ID: {spot_id})")
2025-07-29 20:37:43 +08:00
# 处理产品ID
if 'productIds' in topic and topic['productIds']:
product_id = topic['productIds'][0] if isinstance(topic['productIds'], list) else topic['productIds']
product_data = db_service.get_product_by_id(product_id)
if product_data:
2025-08-04 13:43:52 +08:00
product_name = product_data.get('productName')
2025-07-29 20:37:43 +08:00
enhanced_topic['product_object'] = product_data
2025-08-04 13:43:52 +08:00
enhanced_topic['product'] = product_name
logger.info(f"从数据库加载产品数据: {product_name} (ID: {product_id})")
2025-07-29 20:37:43 +08:00
except Exception as e:
logger.error(f"增强选题数据时发生错误: {e}", exc_info=True)
return enhanced_topic
2025-08-04 13:43:52 +08:00
async def _enhance_with_related_ids(self, enhanced_topic: Dict[str, Any], db_service, related_ids: Dict[str, List[str]]) -> Dict[str, Any]:
"""
使用related_object_ids中的反射ID进行数据库查询和增强
Args:
enhanced_topic: 待增强的选题数据
db_service: 数据库服务实例
related_ids: 反射的ID字典
Returns:
增强后的选题数据
"""
logger.info("开始使用反射ID进行选题数据增强")
try:
# 处理风格ID
style_ids = related_ids.get('style_ids', [])
if style_ids:
style_id = int(style_ids[0]) # 取第一个ID
style_data = db_service.get_style_by_id(style_id)
if style_data:
enhanced_topic['style_object'] = style_data
enhanced_topic['style'] = style_data.get('styleName')
logger.info(f"通过反射ID加载风格数据: {style_data.get('styleName')} (ID: {style_id})")
# 处理受众ID
audience_ids = related_ids.get('audience_ids', [])
if audience_ids:
audience_id = int(audience_ids[0]) # 取第一个ID
audience_data = db_service.get_audience_by_id(audience_id)
if audience_data:
enhanced_topic['audience_object'] = audience_data
enhanced_topic['targetAudience'] = audience_data.get('audienceName')
logger.info(f"通过反射ID加载受众数据: {audience_data.get('audienceName')} (ID: {audience_id})")
# 处理景区ID
scenic_spot_ids = related_ids.get('scenic_spot_ids', [])
if scenic_spot_ids:
spot_id = int(scenic_spot_ids[0]) # 取第一个ID
spot_data = db_service.get_scenic_spot_by_id(spot_id)
if spot_data:
enhanced_topic['scenic_spot_object'] = spot_data
enhanced_topic['object'] = spot_data.get('name')
logger.info(f"通过反射ID加载景区数据: {spot_data.get('name')} (ID: {spot_id})")
# 处理产品ID
product_ids = related_ids.get('product_ids', [])
if product_ids:
product_id = int(product_ids[0]) # 取第一个ID
product_data = db_service.get_product_by_id(product_id)
if product_data:
enhanced_topic['product_object'] = product_data
enhanced_topic['product'] = product_data.get('productName')
logger.info(f"通过反射ID加载产品数据: {product_data.get('productName')} (ID: {product_id})")
logger.info("反射ID数据增强完成")
except Exception as e:
logger.error(f"使用反射ID增强选题数据时发生错误: {e}", exc_info=True)
return enhanced_topic
2025-07-10 17:51:37 +08:00
async def generate_content_with_prompt(self, topic: Dict[str, Any], system_prompt: str, user_prompt: str) -> Tuple[str, str, Dict[str, Any]]:
"""
使用预构建的提示词为选题生成内容
Args:
topic: 选题信息
system_prompt: 系统提示词
user_prompt: 用户提示词
Returns:
请求ID选题索引和生成的内容
"""
2025-07-15 10:59:36 +08:00
topicIndex = topic.get('index', 'unknown')
logger.info(f"开始使用预构建提示词为选题 {topicIndex} 生成内容")
2025-07-15 10:59:36 +08:00
# 直接使用预构建的提示词生成内容
content = await self.content_generator.generate_content_with_prompt(topic, system_prompt, user_prompt)
# 生成请求ID
2025-07-15 10:59:36 +08:00
requestId = f"content-{datetime.now().strftime('%Y%m%d-%H%M%S')}-{str(uuid.uuid4())[:8]}"
2025-07-15 10:59:36 +08:00
logger.info(f"内容生成完成请求ID: {requestId}, 选题索引: {topicIndex}")
return requestId, topicIndex, content
async def judge_content(self, topic: Optional[Dict[str, Any]] = None, content: Dict[str, Any] = {},
style_objects: Optional[List[Dict[str, Any]]] = None,
audience_objects: Optional[List[Dict[str, Any]]] = None,
scenic_spot_objects: Optional[List[Dict[str, Any]]] = None,
product_objects: Optional[List[Dict[str, Any]]] = None) -> Tuple[str, str, Dict[str, Any], bool]:
2025-07-10 17:51:37 +08:00
"""
审核内容 (已重构)
2025-07-10 17:51:37 +08:00
"""
if not topic:
topic = {"index": "1", "date": "2024-07-01"}
if not content:
content = {"title": "未提供内容", "content": "未提供内容"}
2025-07-15 10:59:36 +08:00
topicIndex = topic.get('index', 'unknown')
logger.info(f"开始审核选题 {topicIndex} 的内容")
2025-07-10 17:51:37 +08:00
# 构建包含所有预取信息的enhanced_topic
enhanced_topic = topic.copy()
if style_objects:
enhanced_topic['style_object'] = style_objects[0]
if audience_objects:
enhanced_topic['audience_object'] = audience_objects[0]
if scenic_spot_objects:
enhanced_topic['scenic_spot_object'] = scenic_spot_objects[0]
if product_objects:
enhanced_topic['product_object'] = product_objects[0]
system_prompt, user_prompt = self.prompt_builder.build_judge_prompt(enhanced_topic, content)
judged_data = await self.content_judger.judge_content_with_prompt(content, enhanced_topic, system_prompt, user_prompt)
2025-07-15 10:59:36 +08:00
judgeSuccess = judged_data.get('judge_success', False)
if 'judge_success' in judged_data:
judged_data = {k: v for k, v in judged_data.items() if k != 'judge_success'}
judged_data['judgeSuccess'] = judgeSuccess
2025-07-10 17:51:37 +08:00
2025-07-15 10:59:36 +08:00
requestId = f"judge-{datetime.now().strftime('%Y%m%d-%H%M%S')}-{str(uuid.uuid4())[:8]}"
logger.info(f"内容审核完成请求ID: {requestId}, 选题索引: {topicIndex}, 审核结果: {judgeSuccess}")
return requestId, topicIndex, judged_data, judgeSuccess
async def run_pipeline(self, dates: Optional[str] = None, numTopics: int = 5,
styles: Optional[List[str]] = None,
audiences: Optional[List[str]] = None,
scenic_spots: Optional[List[str]] = None,
products: Optional[List[str]] = None,
skipJudge: bool = False,
autoJudge: bool = False) -> Tuple[str, List[Dict[str, Any]], Dict[str, Dict[str, Any]], Dict[str, Dict[str, Any]]]:
2025-07-10 17:51:37 +08:00
"""
2025-07-15 10:59:36 +08:00
运行完整的内容生成流水线生成选题 生成内容 审核内容
2025-07-10 17:51:37 +08:00
Args:
2025-07-11 17:39:51 +08:00
dates: 日期字符串可能为单个日期多个日期用逗号分隔或范围
2025-07-15 10:59:36 +08:00
numTopics: 要生成的选题数量
2025-07-11 17:39:51 +08:00
styles: 风格列表
audiences: 受众列表
scenic_spots: 景区列表
products: 产品列表
2025-07-15 10:59:36 +08:00
skipJudge: 是否跳过内容审核步骤与autoJudge互斥
autoJudge: 是否在内容生成时进行内嵌审核
2025-07-10 17:51:37 +08:00
Returns:
2025-07-15 10:59:36 +08:00
请求ID选题列表内容字典和审核后内容字典
2025-07-10 17:51:37 +08:00
"""
2025-07-15 10:59:36 +08:00
logger.info(f"开始运行完整流水线,日期: {dates}, 数量: {numTopics}, 内嵌审核: {autoJudge}")
2025-07-10 17:51:37 +08:00
# 生成请求ID
2025-07-15 10:59:36 +08:00
requestId = f"pipeline-{datetime.now().strftime('%Y%m%d-%H%M%S')}-{str(uuid.uuid4())[:8]}"
2025-07-10 17:51:37 +08:00
2025-07-15 10:59:36 +08:00
# 1. 生成选题
_, topics = await self.generate_topics(dates, numTopics, styles, audiences, scenic_spots, products)
2025-07-10 17:51:37 +08:00
if not topics:
2025-07-15 10:59:36 +08:00
logger.error("未能生成任何选题")
return requestId, [], {}, {}
# 2. 为每个选题生成内容
2025-07-10 17:51:37 +08:00
contents = {}
judgedContents = {}
2025-07-10 17:51:37 +08:00
for topic in topics:
2025-07-15 10:59:36 +08:00
topicIndex = topic.get('index', 'unknown')
2025-07-29 20:37:43 +08:00
# 直接传递带有ID的选题数据不再需要传递额外的对象参数
2025-07-15 10:59:36 +08:00
_, _, content = await self.generate_content(topic, autoJudge=autoJudge)
if autoJudge:
# 内嵌审核模式content已包含审核结果和judgeSuccess状态
# 创建原始内容副本移除judgeSuccess状态但保留其他字段
original_content = {k: v for k, v in content.items() if k != 'judgeSuccess'}
contents[topicIndex] = original_content
judgedContents[topicIndex] = content # 包含审核结果和judgeSuccess状态
else:
# 无审核模式:直接保存内容
contents[topicIndex] = content
2025-07-10 17:51:37 +08:00
# 如果使用内嵌审核或跳过审核,直接返回结果
2025-07-15 10:59:36 +08:00
if autoJudge or skipJudge:
logger.info(f"{'使用内嵌审核' if autoJudge else '跳过内容审核步骤'}流水线完成请求ID: {requestId}")
if autoJudge:
return requestId, topics, contents, judgedContents
else:
return requestId, topics, contents, contents
2025-07-10 17:51:37 +08:00
2025-07-15 10:59:36 +08:00
# 3. 对每个内容进行审核
judgedContents = {}
for topicIndex, content in contents.items():
topic = next((t for t in topics if t.get('index') == topicIndex), None)
2025-07-10 17:51:37 +08:00
if not topic:
2025-07-15 10:59:36 +08:00
logger.warning(f"找不到选题 {topicIndex} 的原始数据,跳过审核")
2025-07-10 17:51:37 +08:00
continue
try:
2025-07-15 10:59:36 +08:00
_, _, judged_data, _ = await self.judge_content(topic, content)
judgedContents[topicIndex] = judged_data
2025-07-10 17:51:37 +08:00
except Exception as e:
2025-07-15 10:59:36 +08:00
logger.critical(f"为选题 {topicIndex} 处理内容审核时发生意外错误: {e}", exc_info=True)
2025-07-10 17:51:37 +08:00
2025-07-15 10:59:36 +08:00
logger.info(f"流水线完成请求ID: {requestId}")
return requestId, topics, contents, judgedContents