#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 文字内容服务层 封装现有功能,提供API调用 """ import logging import uuid from typing import List, Dict, Any, Optional, Tuple from datetime import datetime from core.config import ConfigManager, GenerateTopicConfig, GenerateContentConfig from core.ai import AIAgent from utils.file_io import OutputManager from tweet.topic_generator import TopicGenerator from tweet.content_generator import ContentGenerator from tweet.content_judger import ContentJudger from api.services.prompt_builder import PromptBuilderService from api.services.prompt_service import PromptService logger = logging.getLogger(__name__) class TweetService: """文字内容服务类""" def __init__(self, ai_agent: AIAgent, config_manager: ConfigManager, output_manager: OutputManager): """ 初始化文字内容服务 Args: ai_agent: AI代理 config_manager: 配置管理器 output_manager: 输出管理器 """ self.ai_agent = ai_agent self.config_manager = config_manager self.output_manager = output_manager # 初始化各个组件 self.topic_generator = TopicGenerator(ai_agent, config_manager, output_manager) self.content_generator = ContentGenerator(ai_agent, config_manager, output_manager) self.content_judger = ContentJudger(ai_agent, config_manager, output_manager) # 初始化提示词服务和构建器 self.prompt_service = PromptService(config_manager) self.prompt_builder = PromptBuilderService(config_manager, self.prompt_service) async def generate_topics(self, date: str, num_topics: int = 5, style: Optional[str] = None, target_audience: Optional[str] = None) -> Tuple[str, List[Dict[str, Any]]]: """ 生成选题 Args: date: 选题日期,格式为YYYY-MM-DD num_topics: 要生成的选题数量 style: 内容风格 target_audience: 目标受众 Returns: 请求ID和生成的选题列表 """ logger.info(f"开始生成选题,日期: {date}, 数量: {num_topics}") # 获取并更新配置 topic_config = self.config_manager.get_config('topic_gen', GenerateTopicConfig) topic_config.topic.date = date topic_config.topic.num = num_topics # 使用PromptBuilderService构建提示词 system_prompt, user_prompt = self.prompt_builder.build_topic_prompt( num_topics=num_topics, month=date ) # 使用预构建的提示词生成选题 topics = await self.topic_generator.generate_topics_with_prompt(system_prompt, user_prompt) if not topics: logger.error("未能生成任何选题") return str(uuid.uuid4()), [] # 生成请求ID request_id = f"topic_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{str(uuid.uuid4())[:8]}" logger.info(f"选题生成完成,请求ID: {request_id}, 数量: {len(topics)}") return request_id, topics async def generate_content(self, topic: Dict[str, Any]) -> Tuple[str, str, Dict[str, Any]]: """ 为选题生成内容 Args: topic: 选题信息 Returns: 请求ID、选题索引和生成的内容 """ topic_index = topic.get('index', 'unknown') logger.info(f"开始为选题 {topic_index} 生成内容") # 使用PromptBuilderService构建提示词 system_prompt, user_prompt = self.prompt_builder.build_content_prompt(topic, "content") # 使用预构建的提示词生成内容 content = await self.content_generator.generate_content_with_prompt(topic, system_prompt, user_prompt) # 生成请求ID request_id = f"content_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{str(uuid.uuid4())[:8]}" logger.info(f"内容生成完成,请求ID: {request_id}, 选题索引: {topic_index}") return request_id, topic_index, content async def generate_content_with_prompt(self, topic: Dict[str, Any], system_prompt: str, user_prompt: str) -> Tuple[str, str, Dict[str, Any]]: """ 使用预构建的提示词为选题生成内容 Args: topic: 选题信息 system_prompt: 系统提示词 user_prompt: 用户提示词 Returns: 请求ID、选题索引和生成的内容 """ topic_index = topic.get('index', 'unknown') logger.info(f"开始使用预构建提示词为选题 {topic_index} 生成内容") # 使用预构建的提示词生成内容 content = await self.content_generator.generate_content_with_prompt(topic, system_prompt, user_prompt) # 生成请求ID request_id = f"content_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{str(uuid.uuid4())[:8]}" logger.info(f"内容生成完成,请求ID: {request_id}, 选题索引: {topic_index}") return request_id, topic_index, content async def judge_content(self, topic: Dict[str, Any], content: Dict[str, Any]) -> Tuple[str, str, Dict[str, Any], bool]: """ 审核内容 Args: topic: 选题信息 content: 要审核的内容 Returns: 请求ID、选题索引、审核后的内容和审核是否成功 """ topic_index = topic.get('index', 'unknown') logger.info(f"开始审核选题 {topic_index} 的内容") # 使用PromptBuilderService构建提示词 system_prompt, user_prompt = self.prompt_builder.build_judge_prompt(topic, content) # 审核内容 judged_data = await self.content_judger.judge_content_with_prompt(content, topic, system_prompt, user_prompt) judge_success = judged_data.get('judge_success', False) # 生成请求ID request_id = f"judge_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{str(uuid.uuid4())[:8]}" logger.info(f"内容审核完成,请求ID: {request_id}, 选题索引: {topic_index}, 审核结果: {judge_success}") return request_id, topic_index, judged_data, judge_success async def run_pipeline(self, date: str, num_topics: int = 5, style: Optional[str] = None, target_audience: Optional[str] = None, skip_judge: bool = False) -> Tuple[str, List[Dict[str, Any]], Dict[str, Dict[str, Any]], Dict[str, Dict[str, Any]]]: """ 运行完整流水线 Args: date: 选题日期,格式为YYYY-MM-DD num_topics: 要生成的选题数量 style: 内容风格 target_audience: 目标受众 skip_judge: 是否跳过内容审核步骤 Returns: 请求ID、生成的选题列表、生成的内容和审核后的内容 """ logger.info(f"开始运行完整流水线,日期: {date}, 数量: {num_topics}") # 生成请求ID request_id = f"pipeline_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{str(uuid.uuid4())[:8]}" # 步骤1: 生成选题 _, topics = await self.generate_topics(date, num_topics, style, target_audience) if not topics: logger.error("未能生成任何选题,流程终止") return request_id, [], {}, {} # 步骤2: 为每个选题生成内容 contents = {} for topic in topics: topic_index = topic.get('index', 'unknown') _, _, content = await self.generate_content(topic) contents[topic_index] = content # 如果跳过审核,直接返回结果 if skip_judge: logger.info(f"跳过内容审核步骤,流水线完成,请求ID: {request_id}") return request_id, topics, contents, contents # 步骤3: 审核内容 judged_contents = {} for topic_index, content in contents.items(): topic = next((t for t in topics if t.get('index') == topic_index), None) if not topic: logger.warning(f"找不到选题 {topic_index} 的原始数据,跳过审核") continue try: system_prompt, user_prompt = self.prompt_builder.build_judge_prompt(topic, content) judged_data = await self.content_judger.judge_content_with_prompt(content, topic, system_prompt, user_prompt) judged_contents[topic_index] = judged_data except Exception as e: logger.critical(f"为选题 {topic_index} 处理内容审核时发生意外错误: {e}", exc_info=True) logger.info(f"流水线完成,请求ID: {request_id}") return request_id, topics, contents, judged_contents