#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Web Crawling Router 爬虫分析路由 - API v2 """ import logging from typing import Dict, Any, List from fastapi import APIRouter, Depends, HTTPException from fastapi.responses import JSONResponse from ..models import ( XHSSearchRequest, KeywordAnalysisRequest, ContentAnalysisRequest, XHSSearchResponse, KeywordAnalysisResponse, ContentAnalysisResponse, ApiResponse ) logger = logging.getLogger(__name__) router = APIRouter() @router.post("/xhs/search", response_model=XHSSearchResponse, summary="搜索小红书笔记") async def search_xhs_notes( request: XHSSearchRequest, pipeline: Dict[str, Any] = Depends(__import__('api_v2.main', fromlist=['get_crawling_pipeline']).get_crawling_pipeline) ): """ 搜索小红书笔记 - **keyword**: 搜索关键词 - **max_notes**: 最大笔记数量 (1-100) - **sort_type**: 排序类型 (0:综合, 1:最新, 2:最多点赞, 3:最多评论, 4:最多收藏) - **note_type**: 笔记类型 (0:不限, 1:视频笔记, 2:普通笔记) - **cookies**: Cookie字符串(可选) """ try: logger.info(f"开始搜索小红书笔记,关键词: {request.keyword}") # 获取小红书爬虫 xhs_crawler = pipeline["xhs_crawler"] # 设置Cookies(如果提供) if request.cookies: xhs_crawler.set_cookies(request.cookies) # 创建搜索配置 from travel_algorithms.web_crawling import SearchConfig search_config = SearchConfig( keyword=request.keyword, max_notes=request.max_notes, sort_type=request.sort_type, note_type=request.note_type ) # 执行搜索 search_result = xhs_crawler.search_notes(search_config) logger.info(f"搜索完成,找到 {len(search_result.notes)} 条笔记") return XHSSearchResponse( success=search_result.success, message="搜索完成" if search_result.success else "搜索失败", data=search_result.to_dict(), error=search_result.error_message if not search_result.success else None ) except Exception as e: error_msg = f"搜索失败: {str(e)}" logger.error(error_msg, exc_info=True) return JSONResponse( status_code=500, content=XHSSearchResponse( success=False, message="搜索失败", error=error_msg ).dict() ) @router.post("/xhs/batch_search", response_model=ApiResponse, summary="批量搜索关键词") async def batch_search_xhs( keywords: List[str], max_notes_per_keyword: int = 10, cookies: str = None, pipeline: Dict[str, Any] = Depends(__import__('api_v2.main', fromlist=['get_crawling_pipeline']).get_crawling_pipeline) ): """ 批量搜索多个关键词 - **keywords**: 关键词列表 - **max_notes_per_keyword**: 每个关键词的最大笔记数 - **cookies**: Cookie字符串(可选) """ try: logger.info(f"开始批量搜索,关键词数量: {len(keywords)}") # 获取小红书爬虫 xhs_crawler = pipeline["xhs_crawler"] # 设置Cookies(如果提供) if cookies: xhs_crawler.set_cookies(cookies) # 批量搜索 results = xhs_crawler.batch_search(keywords, max_notes_per_keyword) # 统计结果 total_notes = sum(len(result.notes) for result in results.values()) success_count = sum(1 for result in results.values() if result.success) batch_data = { "results": {k: v.to_dict() for k, v in results.items()}, "statistics": { "total_keywords": len(keywords), "successful_searches": success_count, "total_notes_found": total_notes, "success_rate": success_count / len(keywords) if keywords else 0 } } logger.info(f"批量搜索完成,成功 {success_count}/{len(keywords)} 个关键词") return ApiResponse( success=True, message=f"批量搜索完成,找到 {total_notes} 条笔记", data=batch_data ) except Exception as e: error_msg = f"批量搜索失败: {str(e)}" logger.error(error_msg, exc_info=True) return JSONResponse( status_code=500, content=ApiResponse( success=False, message="批量搜索失败", error=error_msg ).dict() ) @router.post("/keywords/analyze", response_model=KeywordAnalysisResponse, summary="分析关键词") async def analyze_keywords( request: KeywordAnalysisRequest, pipeline: Dict[str, Any] = Depends(__import__('api_v2.main', fromlist=['get_crawling_pipeline']).get_crawling_pipeline) ): """ 分析内容中的关键词 - **content**: 待分析的内容 - **source_type**: 内容源类型 - **enable_ai_analysis**: 是否启用AI分析 """ try: logger.info(f"开始分析关键词,内容长度: {len(request.content)}") # 获取关键词分析器 keyword_analyzer = pipeline["keyword_analyzer"] # 分析关键词 analysis_result = await keyword_analyzer.analyze_content( content=request.content, source_type=request.source_type, enable_ai_analysis=request.enable_ai_analysis ) logger.info(f"关键词分析完成,提取到 {len(analysis_result.keywords)} 个关键词") return KeywordAnalysisResponse( success=True, message="关键词分析完成", data=analysis_result.to_dict() ) except Exception as e: error_msg = f"关键词分析失败: {str(e)}" logger.error(error_msg, exc_info=True) return JSONResponse( status_code=500, content=KeywordAnalysisResponse( success=False, message="关键词分析失败", error=error_msg ).dict() ) @router.post("/content/analyze", response_model=ContentAnalysisResponse, summary="分析内容质量") async def analyze_content( request: ContentAnalysisRequest, pipeline: Dict[str, Any] = Depends(__import__('api_v2.main', fromlist=['get_crawling_pipeline']).get_crawling_pipeline) ): """ 分析内容质量和特征 - **search_result**: 搜索结果(可选) - **notes**: 笔记列表(可选) 注意:search_result和notes至少需要提供一个 """ try: logger.info("开始分析内容质量") # 获取内容分析器 content_analyzer = pipeline["content_analyzer"] # 准备笔记数据 notes_data = [] if request.search_result: # 从搜索结果中提取笔记 from travel_algorithms.web_crawling import XHSNote for note_dict in request.search_result.get("notes", []): try: note = XHSNote(**note_dict) notes_data.append(note) except Exception as e: logger.warning(f"解析笔记失败: {e}") continue elif request.notes: # 直接使用提供的笔记 from travel_algorithms.web_crawling import XHSNote for note_dict in request.notes: try: note = XHSNote(**note_dict) notes_data.append(note) except Exception as e: logger.warning(f"解析笔记失败: {e}") continue if not notes_data: raise ValueError("没有有效的笔记数据可供分析") # 分析内容 analysis_result = content_analyzer.analyze_xhs_notes(notes_data) logger.info(f"内容分析完成,质量评分: {analysis_result.content_quality_score:.2f}") return ContentAnalysisResponse( success=True, message="内容分析完成", data=analysis_result.to_dict() ) except Exception as e: error_msg = f"内容分析失败: {str(e)}" logger.error(error_msg, exc_info=True) return JSONResponse( status_code=500, content=ContentAnalysisResponse( success=False, message="内容分析失败", error=error_msg ).dict() ) @router.post("/search_and_analyze", response_model=ApiResponse, summary="搜索并分析内容") async def search_and_analyze( keyword: str, max_notes: int = 20, analyze_keywords: bool = True, analyze_content: bool = True, cookies: str = None, pipeline: Dict[str, Any] = Depends(__import__('api_v2.main', fromlist=['get_crawling_pipeline']).get_crawling_pipeline) ): """ 一站式搜索并分析内容 - **keyword**: 搜索关键词 - **max_notes**: 最大笔记数量 - **analyze_keywords**: 是否分析关键词 - **analyze_content**: 是否分析内容质量 - **cookies**: Cookie字符串(可选) """ try: logger.info(f"开始搜索并分析,关键词: {keyword}") # 获取组件 xhs_crawler = pipeline["xhs_crawler"] keyword_analyzer = pipeline["keyword_analyzer"] if analyze_keywords else None content_analyzer = pipeline["content_analyzer"] if analyze_content else None # 设置Cookies if cookies: xhs_crawler.set_cookies(cookies) # 1. 搜索笔记 from travel_algorithms.web_crawling import SearchConfig search_config = SearchConfig( keyword=keyword, max_notes=max_notes ) search_result = xhs_crawler.search_notes(search_config) if not search_result.success: raise Exception(f"搜索失败: {search_result.error_message}") result_data = { "search_result": search_result.to_dict(), "keyword_analysis": None, "content_analysis": None } # 2. 关键词分析(可选) if analyze_keywords and keyword_analyzer: try: # 合并所有笔记内容进行关键词分析 all_content = " ".join([ f"{note.title} {note.content}" for note in search_result.notes ]) keyword_result = await keyword_analyzer.analyze_content( content=all_content, source_type="xhs_notes", enable_ai_analysis=True ) result_data["keyword_analysis"] = keyword_result.to_dict() except Exception as e: logger.warning(f"关键词分析失败: {e}") result_data["keyword_analysis"] = {"error": str(e)} # 3. 内容质量分析(可选) if analyze_content and content_analyzer and search_result.notes: try: content_result = content_analyzer.analyze_xhs_notes(search_result.notes) result_data["content_analysis"] = content_result.to_dict() except Exception as e: logger.warning(f"内容分析失败: {e}") result_data["content_analysis"] = {"error": str(e)} logger.info(f"搜索并分析完成,找到 {len(search_result.notes)} 条笔记") return ApiResponse( success=True, message="搜索并分析完成", data=result_data ) except Exception as e: error_msg = f"搜索并分析失败: {str(e)}" logger.error(error_msg, exc_info=True) return JSONResponse( status_code=500, content=ApiResponse( success=False, message="搜索并分析失败", error=error_msg ).dict() ) @router.get("/pipeline/stats", response_model=ApiResponse, summary="获取流水线统计") async def get_pipeline_stats( pipeline: Dict[str, Any] = Depends(__import__('api_v2.main', fromlist=['get_crawling_pipeline']).get_crawling_pipeline) ): """获取爬虫分析流水线的统计信息""" try: stats = { "xhs_crawler": pipeline["xhs_crawler"].get_crawler_stats(), "keyword_analyzer": pipeline["keyword_analyzer"].get_analyzer_stats(), "content_analyzer": pipeline["content_analyzer"].get_analyzer_stats(), "config": { "web_crawling": pipeline["config"].web_crawling.dict(), "keyword_analysis": pipeline["config"].keyword_analysis.dict(), "content_analysis": pipeline["config"].content_analysis.dict() } } return ApiResponse( success=True, message="统计信息获取成功", data=stats ) except Exception as e: error_msg = f"获取统计信息失败: {str(e)}" logger.error(error_msg, exc_info=True) return JSONResponse( status_code=500, content=ApiResponse( success=False, message="获取统计信息失败", error=error_msg ).dict() )