155 lines
4.8 KiB
Python
Raw Normal View History

2025-07-10 17:51:37 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
文字内容API路由
"""
import logging
from fastapi import APIRouter, Depends, HTTPException
from typing import List, Dict, Any
from core.config import ConfigManager
from core.ai import AIAgent
from utils.file_io import OutputManager
from api.services.tweet import TweetService
from api.models.tweet import (
TopicRequest, TopicResponse,
ContentRequest, ContentResponse,
JudgeRequest, JudgeResponse,
PipelineRequest, PipelineResponse
)
# 从main.py中导入依赖
from api.main import get_config, get_ai_agent, get_output_manager
logger = logging.getLogger(__name__)
# 创建路由
router = APIRouter()
# 依赖注入函数
def get_tweet_service(
config_manager: ConfigManager = Depends(get_config),
ai_agent: AIAgent = Depends(get_ai_agent),
output_manager: OutputManager = Depends(get_output_manager)
) -> TweetService:
"""获取文字内容服务"""
return TweetService(ai_agent, config_manager, output_manager)
@router.post("/topics", response_model=TopicResponse, summary="生成选题")
async def generate_topics(
request: TopicRequest,
tweet_service: TweetService = Depends(get_tweet_service)
):
"""
生成选题
- **date**: 选题日期格式为YYYY-MM-DD
- **num_topics**: 要生成的选题数量
- **style**: 内容风格'旅游攻略''亲子游'
- **target_audience**: 目标受众'年轻人''家庭'
"""
try:
request_id, topics = await tweet_service.generate_topics(
date=request.date,
num_topics=request.num_topics,
style=request.style,
target_audience=request.target_audience
)
return TopicResponse(
request_id=request_id,
topics=topics
)
except Exception as e:
logger.error(f"生成选题失败: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"生成选题失败: {str(e)}")
@router.post("/content", response_model=ContentResponse, summary="生成内容")
async def generate_content(
request: ContentRequest,
tweet_service: TweetService = Depends(get_tweet_service)
):
"""
为选题生成内容
- **topic**: 选题信息
"""
try:
request_id, topic_index, content = await tweet_service.generate_content(
topic=request.topic
)
return ContentResponse(
request_id=request_id,
topic_index=topic_index,
content=content
)
except Exception as e:
logger.error(f"生成内容失败: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"生成内容失败: {str(e)}")
@router.post("/judge", response_model=JudgeResponse, summary="审核内容")
async def judge_content(
request: JudgeRequest,
tweet_service: TweetService = Depends(get_tweet_service)
):
"""
审核内容
- **topic**: 选题信息
- **content**: 要审核的内容
"""
try:
request_id, topic_index, judged_content, judge_success = await tweet_service.judge_content(
topic=request.topic,
content=request.content
)
return JudgeResponse(
request_id=request_id,
topic_index=topic_index,
judged_content=judged_content,
judge_success=judge_success
)
except Exception as e:
logger.error(f"审核内容失败: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"审核内容失败: {str(e)}")
@router.post("/pipeline", response_model=PipelineResponse, summary="运行完整流水线")
async def run_pipeline(
request: PipelineRequest,
tweet_service: TweetService = Depends(get_tweet_service)
):
"""
运行完整流水线包括生成选题生成内容和审核内容
- **date**: 选题日期格式为YYYY-MM-DD
- **num_topics**: 要生成的选题数量
- **style**: 内容风格'旅游攻略''亲子游'
- **target_audience**: 目标受众'年轻人''家庭'
- **skip_judge**: 是否跳过内容审核步骤
"""
try:
request_id, topics, contents, judged_contents = await tweet_service.run_pipeline(
date=request.date,
num_topics=request.num_topics,
style=request.style,
target_audience=request.target_audience,
skip_judge=request.skip_judge
)
return PipelineResponse(
request_id=request_id,
topics=topics,
contents=contents,
judged_contents=judged_contents
)
except Exception as e:
logger.error(f"运行流水线失败: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"运行流水线失败: {str(e)}")