313 lines
14 KiB
Python
313 lines
14 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
"""
|
||
文字内容服务层
|
||
封装现有功能,提供API调用
|
||
"""
|
||
|
||
import logging
|
||
import uuid
|
||
from typing import List, Dict, Any, Optional, Tuple
|
||
from datetime import datetime
|
||
|
||
from core.config import ConfigManager, GenerateTopicConfig, GenerateContentConfig
|
||
from core.ai import AIAgent
|
||
from utils.file_io import OutputManager
|
||
from tweet.topic_generator import TopicGenerator
|
||
from tweet.content_generator import ContentGenerator
|
||
from tweet.content_judger import ContentJudger
|
||
from api.services.prompt_builder import PromptBuilderService
|
||
from api.services.prompt_service import PromptService
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class TweetService:
|
||
"""文字内容服务类"""
|
||
|
||
def __init__(self, ai_agent: AIAgent, config_manager: ConfigManager, output_manager: OutputManager):
|
||
"""
|
||
初始化文字内容服务
|
||
|
||
Args:
|
||
ai_agent: AI代理
|
||
config_manager: 配置管理器
|
||
output_manager: 输出管理器
|
||
"""
|
||
self.ai_agent = ai_agent
|
||
self.config_manager = config_manager
|
||
self.output_manager = output_manager
|
||
|
||
# 初始化各个组件
|
||
self.topic_generator = TopicGenerator(ai_agent, config_manager, output_manager)
|
||
self.content_generator = ContentGenerator(ai_agent, config_manager, output_manager)
|
||
self.content_judger = ContentJudger(ai_agent, config_manager, output_manager)
|
||
|
||
# 初始化提示词服务和构建器
|
||
self.prompt_service = PromptService(config_manager)
|
||
self.prompt_builder = PromptBuilderService(config_manager, self.prompt_service)
|
||
|
||
async def generate_topics(self, dates: Optional[str] = None, num_topics: int = 5,
|
||
styles: Optional[List[str]] = None,
|
||
audiences: Optional[List[str]] = None,
|
||
scenic_spots: Optional[List[str]] = None,
|
||
products: Optional[List[str]] = None) -> Tuple[str, List[Dict[str, Any]]]:
|
||
"""
|
||
生成选题
|
||
|
||
Args:
|
||
dates: 日期字符串,可能为单个日期、多个日期用逗号分隔或范围
|
||
num_topics: 要生成的选题数量
|
||
styles: 风格列表
|
||
audiences: 受众列表
|
||
scenic_spots: 景区列表
|
||
products: 产品列表
|
||
|
||
Returns:
|
||
请求ID和生成的选题列表
|
||
"""
|
||
logger.info(f"开始生成选题,日期: {dates}, 数量: {num_topics}")
|
||
|
||
# 获取并更新配置
|
||
topic_config = self.config_manager.get_config('topic_gen', GenerateTopicConfig)
|
||
if dates:
|
||
topic_config.topic.date = dates
|
||
topic_config.topic.num = num_topics
|
||
|
||
# 使用PromptBuilderService构建提示词
|
||
system_prompt, user_prompt = self.prompt_builder.build_topic_prompt(
|
||
products=products,
|
||
scenic_spots=scenic_spots,
|
||
styles=styles,
|
||
audiences=audiences,
|
||
dates=dates,
|
||
num_topics=num_topics
|
||
)
|
||
|
||
# 使用预构建的提示词生成选题
|
||
topics = await self.topic_generator.generate_topics_with_prompt(system_prompt, user_prompt)
|
||
if not topics:
|
||
logger.error("未能生成任何选题")
|
||
return str(uuid.uuid4()), []
|
||
|
||
# 生成请求ID
|
||
request_id = f"topic-{datetime.now().strftime('%Y%m%d-%H%M%S')}-{str(uuid.uuid4())[:8]}"
|
||
|
||
logger.info(f"选题生成完成,请求ID: {request_id}, 数量: {len(topics)}")
|
||
return request_id, topics
|
||
|
||
async def generate_content(self, topic: Optional[Dict[str, Any]] = None,
|
||
styles: Optional[List[str]] = None,
|
||
audiences: Optional[List[str]] = None,
|
||
scenic_spots: Optional[List[str]] = None,
|
||
products: Optional[List[str]] = None,
|
||
auto_judge: bool = False) -> Tuple[str, str, Dict[str, Any]]:
|
||
"""
|
||
为选题生成内容
|
||
|
||
Args:
|
||
topic: 选题信息
|
||
styles: 风格列表
|
||
audiences: 受众列表
|
||
scenic_spots: 景区列表
|
||
products: 产品列表
|
||
auto_judge: 是否自动进行内容审核
|
||
|
||
Returns:
|
||
请求ID、选题索引和生成的内容(如果启用审核则返回审核后的内容)
|
||
"""
|
||
# 如果没有提供topic,创建一个基础的topic
|
||
if not topic:
|
||
topic = {"index": "1", "date": "2024-07-01"}
|
||
|
||
topic_index = topic.get('index', 'unknown')
|
||
logger.info(f"开始为选题 {topic_index} 生成内容{'(含审核)' if auto_judge else ''}")
|
||
|
||
# 创建topic的副本并应用覆盖参数
|
||
enhanced_topic = topic.copy()
|
||
if styles and len(styles) > 0:
|
||
enhanced_topic['style'] = styles[0] # 使用第一个风格
|
||
if audiences and len(audiences) > 0:
|
||
enhanced_topic['target_audience'] = audiences[0] # 使用第一个受众
|
||
if scenic_spots and len(scenic_spots) > 0:
|
||
enhanced_topic['object'] = scenic_spots[0] # 使用第一个景区
|
||
if products and len(products) > 0:
|
||
enhanced_topic['product'] = products[0] # 使用第一个产品
|
||
|
||
# 使用PromptBuilderService构建提示词
|
||
system_prompt, user_prompt = self.prompt_builder.build_content_prompt(enhanced_topic, "content")
|
||
|
||
# 使用预构建的提示词生成内容
|
||
content = await self.content_generator.generate_content_with_prompt(enhanced_topic, system_prompt, user_prompt)
|
||
|
||
# 如果启用自动审核,则进行内嵌审核
|
||
if auto_judge:
|
||
logger.info(f"开始对选题 {topic_index} 的内容进行内嵌审核")
|
||
try:
|
||
# 构建简化的审核提示词(只需要产品信息、景区信息和文章)
|
||
judge_system_prompt, judge_user_prompt = self.prompt_builder.build_judge_prompt_simple(enhanced_topic, content)
|
||
|
||
# 进行审核
|
||
judged_content = await self.content_judger.judge_content_with_prompt(content, enhanced_topic, judge_system_prompt, judge_user_prompt)
|
||
|
||
if judged_content.get('judge_success', False):
|
||
logger.info(f"选题 {topic_index} 内容审核成功,使用审核后的内容")
|
||
content = judged_content
|
||
else:
|
||
logger.warning(f"选题 {topic_index} 内容审核失败,使用原始内容")
|
||
# 为原始内容添加审核失败标记
|
||
content['judge_success'] = False
|
||
|
||
except Exception as e:
|
||
logger.error(f"选题 {topic_index} 内嵌审核失败: {e},使用原始内容")
|
||
# 为原始内容添加审核失败标记
|
||
content['judge_success'] = False
|
||
|
||
# 生成请求ID
|
||
request_id = f"content-{datetime.now().strftime('%Y%m%d-%H%M%S')}-{str(uuid.uuid4())[:8]}"
|
||
|
||
logger.info(f"内容生成完成,请求ID: {request_id}, 选题索引: {topic_index}")
|
||
return request_id, topic_index, content
|
||
|
||
async def generate_content_with_prompt(self, topic: Dict[str, Any], system_prompt: str, user_prompt: str) -> Tuple[str, str, Dict[str, Any]]:
|
||
"""
|
||
使用预构建的提示词为选题生成内容
|
||
|
||
Args:
|
||
topic: 选题信息
|
||
system_prompt: 系统提示词
|
||
user_prompt: 用户提示词
|
||
|
||
Returns:
|
||
请求ID、选题索引和生成的内容
|
||
"""
|
||
topic_index = topic.get('index', 'unknown')
|
||
logger.info(f"开始使用预构建提示词为选题 {topic_index} 生成内容")
|
||
|
||
# 使用预构建的提示词生成内容
|
||
content = await self.content_generator.generate_content_with_prompt(topic, system_prompt, user_prompt)
|
||
|
||
# 生成请求ID
|
||
request_id = f"content-{datetime.now().strftime('%Y%m%d-%H%M%S')}-{str(uuid.uuid4())[:8]}"
|
||
|
||
logger.info(f"内容生成完成,请求ID: {request_id}, 选题索引: {topic_index}")
|
||
return request_id, topic_index, content
|
||
|
||
async def judge_content(self, topic: Optional[Dict[str, Any]] = None, content: Dict[str, Any] = {},
|
||
styles: Optional[List[str]] = None,
|
||
audiences: Optional[List[str]] = None,
|
||
scenic_spots: Optional[List[str]] = None,
|
||
products: Optional[List[str]] = None) -> Tuple[str, str, Dict[str, Any], bool]:
|
||
"""
|
||
审核内容
|
||
|
||
Args:
|
||
topic: 选题信息
|
||
content: 要审核的内容
|
||
styles: 风格列表
|
||
audiences: 受众列表
|
||
scenic_spots: 景区列表
|
||
products: 产品列表
|
||
|
||
Returns:
|
||
请求ID、选题索引、审核后的内容和审核是否成功
|
||
"""
|
||
# 如果没有提供topic,创建一个基础的topic
|
||
if not topic:
|
||
topic = {"index": "1", "date": "2024-07-01"}
|
||
|
||
# 如果没有提供content,返回错误
|
||
if not content:
|
||
content = {"title": "未提供内容", "content": "未提供内容"}
|
||
|
||
topic_index = topic.get('index', 'unknown')
|
||
logger.info(f"开始审核选题 {topic_index} 的内容")
|
||
|
||
# 创建topic的副本并应用覆盖参数
|
||
enhanced_topic = topic.copy()
|
||
if styles and len(styles) > 0:
|
||
enhanced_topic['style'] = styles[0] # 使用第一个风格
|
||
if audiences and len(audiences) > 0:
|
||
enhanced_topic['target_audience'] = audiences[0] # 使用第一个受众
|
||
if scenic_spots and len(scenic_spots) > 0:
|
||
enhanced_topic['object'] = scenic_spots[0] # 使用第一个景区
|
||
if products and len(products) > 0:
|
||
enhanced_topic['product'] = products[0] # 使用第一个产品
|
||
|
||
# 使用PromptBuilderService构建提示词
|
||
system_prompt, user_prompt = self.prompt_builder.build_judge_prompt(enhanced_topic, content)
|
||
|
||
# 审核内容
|
||
judged_data = await self.content_judger.judge_content_with_prompt(content, enhanced_topic, system_prompt, user_prompt)
|
||
judge_success = judged_data.get('judge_success', False)
|
||
|
||
# 生成请求ID
|
||
request_id = f"judge-{datetime.now().strftime('%Y%m%d-%H%M%S')}-{str(uuid.uuid4())[:8]}"
|
||
|
||
logger.info(f"内容审核完成,请求ID: {request_id}, 选题索引: {topic_index}, 审核结果: {judge_success}")
|
||
return request_id, topic_index, judged_data, judge_success
|
||
|
||
async def run_pipeline(self, dates: Optional[str] = None, num_topics: int = 5,
|
||
styles: Optional[List[str]] = None,
|
||
audiences: Optional[List[str]] = None,
|
||
scenic_spots: Optional[List[str]] = None,
|
||
products: Optional[List[str]] = None,
|
||
skip_judge: bool = False,
|
||
auto_judge: bool = False) -> Tuple[str, List[Dict[str, Any]], Dict[str, Dict[str, Any]], Dict[str, Dict[str, Any]]]:
|
||
"""
|
||
运行完整流水线
|
||
|
||
Args:
|
||
dates: 日期字符串,可能为单个日期、多个日期用逗号分隔或范围
|
||
num_topics: 要生成的选题数量
|
||
styles: 风格列表
|
||
audiences: 受众列表
|
||
scenic_spots: 景区列表
|
||
products: 产品列表
|
||
skip_judge: 是否跳过内容审核步骤(与auto_judge互斥)
|
||
auto_judge: 是否在内容生成时进行内嵌审核
|
||
|
||
Returns:
|
||
请求ID、生成的选题列表、生成的内容和审核后的内容
|
||
"""
|
||
logger.info(f"开始运行完整流水线,日期: {dates}, 数量: {num_topics}, 内嵌审核: {auto_judge}")
|
||
|
||
# 生成请求ID
|
||
request_id = f"pipeline-{datetime.now().strftime('%Y%m%d-%H%M%S')}-{str(uuid.uuid4())[:8]}"
|
||
|
||
# 步骤1: 生成选题
|
||
_, topics = await self.generate_topics(dates, num_topics, styles, audiences, scenic_spots, products)
|
||
if not topics:
|
||
logger.error("未能生成任何选题,流程终止")
|
||
return request_id, [], {}, {}
|
||
|
||
# 步骤2: 为每个选题生成内容(可选择内嵌审核)
|
||
contents = {}
|
||
for topic in topics:
|
||
topic_index = topic.get('index', 'unknown')
|
||
_, _, content = await self.generate_content(topic, auto_judge=auto_judge)
|
||
contents[topic_index] = content
|
||
|
||
# 如果使用内嵌审核或跳过审核,直接返回结果
|
||
if auto_judge or skip_judge:
|
||
logger.info(f"{'使用内嵌审核' if auto_judge else '跳过内容审核步骤'},流水线完成,请求ID: {request_id}")
|
||
return request_id, topics, contents, contents
|
||
|
||
# 步骤3: 独立审核内容(仅在未使用内嵌审核且未跳过审核时执行)
|
||
judged_contents = {}
|
||
for topic_index, content in contents.items():
|
||
topic = next((t for t in topics if t.get('index') == topic_index), None)
|
||
if not topic:
|
||
logger.warning(f"找不到选题 {topic_index} 的原始数据,跳过审核")
|
||
continue
|
||
|
||
try:
|
||
system_prompt, user_prompt = self.prompt_builder.build_judge_prompt(topic, content)
|
||
judged_data = await self.content_judger.judge_content_with_prompt(content, topic, system_prompt, user_prompt)
|
||
judged_contents[topic_index] = judged_data
|
||
except Exception as e:
|
||
logger.critical(f"为选题 {topic_index} 处理内容审核时发生意外错误: {e}", exc_info=True)
|
||
|
||
logger.info(f"流水线完成,请求ID: {request_id}")
|
||
return request_id, topics, contents, judged_contents |