#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 内容整合API路由 提供小红书内容和文档整合的API接口 """ import logging from typing import List, Dict, Any, Optional from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks, Query from pydantic import BaseModel, Field from api.services.integration_service import IntegrationService from api.dependencies import get_integration_service from api.models.integration import ( XHSSearchRequest, IntegrationRequest, BatchIntegrationRequest, CookieManagementRequest, ExportRequest, XHSSearchResponse, IntegrationResponse, CookieStatsResponse, ServiceStatusResponse, HealthCheckResponse, TaskSummaryResponse, ValidationResponse, ApiResponse, BatchSearchRequest, BatchSearchResponse ) logger = logging.getLogger(__name__) router = APIRouter( prefix="/integration", tags=["integration"], responses={404: {"description": "Not found"}}, ) # ============================================================================ # 健康检查和状态接口 # ============================================================================ @router.get("/status", response_model=ServiceStatusResponse, summary="获取服务状态") async def get_service_status( service: IntegrationService = Depends(get_integration_service) ): """获取服务状态""" try: return service.get_service_status() except Exception as e: logger.error(f"获取服务状态失败: {e}") raise HTTPException(status_code=500, detail=str(e)) # ============================================================================ # 小红书搜索接口 # ============================================================================ @router.post("/search", response_model=XHSSearchResponse, summary="搜索小红书笔记") async def search_xhs_notes( request: XHSSearchRequest, service: IntegrationService = Depends(get_integration_service) ): """搜索小红书笔记""" try: logger.info(f"搜索小红书笔记: {request.keyword}") result = service.search_xhs_notes(request) if result.success: logger.info(f"搜索成功,找到 {result.total_count} 条笔记") else: logger.warning(f"搜索失败: {result.error_message}") return result except Exception as e: logger.error(f"搜索失败: {e}") raise HTTPException(status_code=500, detail=str(e)) # ============================================================================ # 内容整合接口 # ============================================================================ @router.post("/integrate", response_model=IntegrationResponse, summary="整合内容") async def integrate_content( request: IntegrationRequest, service: IntegrationService = Depends(get_integration_service) ): """整合内容""" try: logger.info(f"开始整合内容: {request.keyword}") result = service.integrate_content(request) if result.success: logger.info(f"整合成功,任务ID: {result.task_id}") else: logger.warning(f"整合失败: {result.error_message}") return result except Exception as e: logger.error(f"内容整合失败: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/integrate/batch", response_model=Dict[str, IntegrationResponse], summary="批量整合内容") async def batch_integrate_content( request: BatchIntegrationRequest, background_tasks: BackgroundTasks, service: IntegrationService = Depends(get_integration_service) ): """批量整合内容""" try: logger.info(f"开始批量整合,共 {len(request.tasks)} 个任务") results = service.batch_integrate_content(request) success_count = sum(1 for r in results.values() if r.success) logger.info(f"批量整合完成,成功 {success_count}/{len(results)} 个任务") return results except Exception as e: logger.error(f"批量整合失败: {e}") raise HTTPException(status_code=500, detail=str(e)) # ============================================================================ # 结果管理接口 # ============================================================================ @router.get("/results", response_model=List[TaskSummaryResponse], summary="列出所有整合结果") async def list_integration_results( service: IntegrationService = Depends(get_integration_service) ): """列出所有整合结果""" try: results = service.list_integration_results() logger.info(f"找到 {len(results)} 个整合结果") return results except Exception as e: logger.error(f"列出结果失败: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/results/{task_id}", response_model=IntegrationResponse, summary="获取指定的整合结果") async def get_integration_result( task_id: str, service: IntegrationService = Depends(get_integration_service) ): """获取指定的整合结果""" try: result = service.get_integration_result(task_id) if result: return result else: raise HTTPException(status_code=404, detail="结果不存在") except HTTPException: raise except Exception as e: logger.error(f"获取结果失败: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/results/export", response_model=ApiResponse, summary="导出结果") async def export_result( request: ExportRequest, service: IntegrationService = Depends(get_integration_service) ): """导出结果""" try: output_file = service.export_result(request) if output_file: return ApiResponse( success=True, message=f"导出成功", data={"output_file": output_file} ) else: return ApiResponse( success=False, message="导出失败,请检查任务ID是否正确", data=None ) except Exception as e: logger.error(f"导出失败: {e}") raise HTTPException(status_code=500, detail=str(e)) # ============================================================================ # Cookie 管理接口 # ============================================================================ @router.post("/cookies", response_model=ApiResponse, summary="添加Cookie") async def add_cookie( request: CookieManagementRequest, service: IntegrationService = Depends(get_integration_service) ): """添加Cookie""" try: success = service.add_cookie(request) return ApiResponse( success=success, message="Cookie添加成功" if success else "Cookie添加失败(可能已存在)", data={"cookie_name": request.name} ) except Exception as e: logger.error(f"添加Cookie失败: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.delete("/cookies/{cookie_name}", response_model=ApiResponse, summary="删除Cookie") async def remove_cookie( cookie_name: str, service: IntegrationService = Depends(get_integration_service) ): """删除Cookie""" try: success = service.remove_cookie(cookie_name) return ApiResponse( success=success, message="Cookie删除成功" if success else "Cookie删除失败(可能不存在)", data={"cookie_name": cookie_name} ) except Exception as e: logger.error(f"删除Cookie失败: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/cookies", response_model=CookieStatsResponse, summary="获取Cookie统计") async def get_cookie_stats( service: IntegrationService = Depends(get_integration_service) ): """获取Cookie统计""" try: return service.get_cookie_stats() except Exception as e: logger.error(f"获取Cookie统计失败: {e}") raise HTTPException(status_code=500, detail=str(e)) # ============================================================================ # 工具接口 # ============================================================================ @router.get("/formats/document", response_model=ApiResponse, summary="获取支持的文档格式") async def get_supported_document_formats( service: IntegrationService = Depends(get_integration_service) ): """获取支持的文档格式""" try: formats = service.document_adapter.get_supported_formats() return ApiResponse( success=True, message="获取支持格式成功", data={"supported_formats": formats} ) except Exception as e: logger.error(f"获取支持格式失败: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/formats/output", response_model=ApiResponse, summary="获取支持的输出格式") async def get_supported_output_formats( service: IntegrationService = Depends(get_integration_service) ): """获取支持的输出格式""" try: formats = service.document_adapter.get_supported_output_formats() return ApiResponse( success=True, message="获取输出格式成功", data={"output_formats": formats} ) except Exception as e: logger.error(f"获取输出格式失败: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/validate/documents", response_model=ValidationResponse, summary="验证文档") async def validate_documents( document_paths: List[str], service: IntegrationService = Depends(get_integration_service) ): """验证文档""" try: validation_results = service.validate_documents(document_paths) valid_count = sum(1 for valid in validation_results.values() if valid) invalid_count = len(validation_results) - valid_count invalid_documents = [path for path, valid in validation_results.items() if not valid] return ValidationResponse( validation_results=validation_results, valid_count=valid_count, invalid_count=invalid_count, invalid_documents=invalid_documents ) except Exception as e: logger.error(f"验证文档失败: {e}") raise HTTPException(status_code=500, detail=str(e)) # ============================================================================ # 快速操作接口 # ============================================================================ @router.get("/quick", response_model=ApiResponse, summary="快速整合") async def quick_integration( keyword: str = Query(..., description="搜索关键词"), document_paths: List[str] = Query(..., description="文档路径列表"), output_format: str = Query(default="summary", description="输出格式"), service: IntegrationService = Depends(get_integration_service) ): """快速整合(一键处理并导出)""" try: logger.info(f"快速整合: {keyword}") # 创建整合请求 integration_request = IntegrationRequest( keyword=keyword, document_paths=document_paths, output_format=output_format ) # 执行整合 integration_result = service.integrate_content(integration_request) if not integration_result.success: return ApiResponse( success=False, message=f"整合失败: {integration_result.error_message}", data=None ) # 导出结果 export_request = ExportRequest( task_id=integration_result.task_id, output_format=output_format ) output_file = service.export_result(export_request) if output_file: return ApiResponse( success=True, message=f"快速整合完成", data={ "task_id": integration_result.task_id, "output_file": output_file, "processing_time": integration_result.processing_time, "notes_count": len(integration_result.xhs_content.notes) if integration_result.xhs_content else 0, "documents_count": len(integration_result.document_content.documents) if integration_result.document_content else 0 } ) else: return ApiResponse( success=False, message="整合成功但导出失败", data={"task_id": integration_result.task_id} ) except Exception as e: logger.error(f"快速整合失败: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/batch-search", response_model=BatchSearchResponse, summary="批量搜索小红书笔记") async def batch_search( request: BatchSearchRequest, service: IntegrationService = Depends(get_integration_service) ): """批量搜索小红书笔记 支持多个关键词的批量搜索,包括: - 自定义Cookie配置 - 批量关键词搜索 - 图像下载和存储 - 详细内容获取 - 结构化数据输出 """ try: logger.info(f"开始批量搜索,关键词数量: {len(request.keywords)}") result = await service.batch_search(request) logger.info(f"批量搜索完成,总计 {result.total_notes} 条笔记") return result except Exception as e: logger.error(f"批量搜索失败: {e}") raise HTTPException(status_code=500, detail=f"批量搜索失败: {str(e)}")