404 lines
14 KiB
Python
Raw Permalink Normal View History

2025-07-31 15:35:23 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Web Crawling Router
爬虫分析路由 - API v2
"""
import logging
from typing import Dict, Any, List
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import JSONResponse
from ..models import (
XHSSearchRequest,
KeywordAnalysisRequest,
ContentAnalysisRequest,
XHSSearchResponse,
KeywordAnalysisResponse,
ContentAnalysisResponse,
ApiResponse
)
logger = logging.getLogger(__name__)
router = APIRouter()
@router.post("/xhs/search", response_model=XHSSearchResponse, summary="搜索小红书笔记")
async def search_xhs_notes(
request: XHSSearchRequest,
pipeline: Dict[str, Any] = Depends(__import__('api_v2.main', fromlist=['get_crawling_pipeline']).get_crawling_pipeline)
):
"""
搜索小红书笔记
- **keyword**: 搜索关键词
- **max_notes**: 最大笔记数量 (1-100)
- **sort_type**: 排序类型 (0:综合, 1:最新, 2:最多点赞, 3:最多评论, 4:最多收藏)
- **note_type**: 笔记类型 (0:不限, 1:视频笔记, 2:普通笔记)
- **cookies**: Cookie字符串可选
"""
try:
logger.info(f"开始搜索小红书笔记,关键词: {request.keyword}")
# 获取小红书爬虫
xhs_crawler = pipeline["xhs_crawler"]
# 设置Cookies如果提供
if request.cookies:
xhs_crawler.set_cookies(request.cookies)
# 创建搜索配置
from travel_algorithms.web_crawling import SearchConfig
search_config = SearchConfig(
keyword=request.keyword,
max_notes=request.max_notes,
sort_type=request.sort_type,
note_type=request.note_type
)
# 执行搜索
search_result = xhs_crawler.search_notes(search_config)
logger.info(f"搜索完成,找到 {len(search_result.notes)} 条笔记")
return XHSSearchResponse(
success=search_result.success,
message="搜索完成" if search_result.success else "搜索失败",
data=search_result.to_dict(),
error=search_result.error_message if not search_result.success else None
)
except Exception as e:
error_msg = f"搜索失败: {str(e)}"
logger.error(error_msg, exc_info=True)
return JSONResponse(
status_code=500,
content=XHSSearchResponse(
success=False,
message="搜索失败",
error=error_msg
).dict()
)
@router.post("/xhs/batch_search", response_model=ApiResponse, summary="批量搜索关键词")
async def batch_search_xhs(
keywords: List[str],
max_notes_per_keyword: int = 10,
cookies: str = None,
pipeline: Dict[str, Any] = Depends(__import__('api_v2.main', fromlist=['get_crawling_pipeline']).get_crawling_pipeline)
):
"""
批量搜索多个关键词
- **keywords**: 关键词列表
- **max_notes_per_keyword**: 每个关键词的最大笔记数
- **cookies**: Cookie字符串可选
"""
try:
logger.info(f"开始批量搜索,关键词数量: {len(keywords)}")
# 获取小红书爬虫
xhs_crawler = pipeline["xhs_crawler"]
# 设置Cookies如果提供
if cookies:
xhs_crawler.set_cookies(cookies)
# 批量搜索
results = xhs_crawler.batch_search(keywords, max_notes_per_keyword)
# 统计结果
total_notes = sum(len(result.notes) for result in results.values())
success_count = sum(1 for result in results.values() if result.success)
batch_data = {
"results": {k: v.to_dict() for k, v in results.items()},
"statistics": {
"total_keywords": len(keywords),
"successful_searches": success_count,
"total_notes_found": total_notes,
"success_rate": success_count / len(keywords) if keywords else 0
}
}
logger.info(f"批量搜索完成,成功 {success_count}/{len(keywords)} 个关键词")
return ApiResponse(
success=True,
message=f"批量搜索完成,找到 {total_notes} 条笔记",
data=batch_data
)
except Exception as e:
error_msg = f"批量搜索失败: {str(e)}"
logger.error(error_msg, exc_info=True)
return JSONResponse(
status_code=500,
content=ApiResponse(
success=False,
message="批量搜索失败",
error=error_msg
).dict()
)
@router.post("/keywords/analyze", response_model=KeywordAnalysisResponse, summary="分析关键词")
async def analyze_keywords(
request: KeywordAnalysisRequest,
pipeline: Dict[str, Any] = Depends(__import__('api_v2.main', fromlist=['get_crawling_pipeline']).get_crawling_pipeline)
):
"""
分析内容中的关键词
- **content**: 待分析的内容
- **source_type**: 内容源类型
- **enable_ai_analysis**: 是否启用AI分析
"""
try:
logger.info(f"开始分析关键词,内容长度: {len(request.content)}")
# 获取关键词分析器
keyword_analyzer = pipeline["keyword_analyzer"]
# 分析关键词
analysis_result = await keyword_analyzer.analyze_content(
content=request.content,
source_type=request.source_type,
enable_ai_analysis=request.enable_ai_analysis
)
logger.info(f"关键词分析完成,提取到 {len(analysis_result.keywords)} 个关键词")
return KeywordAnalysisResponse(
success=True,
message="关键词分析完成",
data=analysis_result.to_dict()
)
except Exception as e:
error_msg = f"关键词分析失败: {str(e)}"
logger.error(error_msg, exc_info=True)
return JSONResponse(
status_code=500,
content=KeywordAnalysisResponse(
success=False,
message="关键词分析失败",
error=error_msg
).dict()
)
@router.post("/content/analyze", response_model=ContentAnalysisResponse, summary="分析内容质量")
async def analyze_content(
request: ContentAnalysisRequest,
pipeline: Dict[str, Any] = Depends(__import__('api_v2.main', fromlist=['get_crawling_pipeline']).get_crawling_pipeline)
):
"""
分析内容质量和特征
- **search_result**: 搜索结果可选
- **notes**: 笔记列表可选
注意search_result和notes至少需要提供一个
"""
try:
logger.info("开始分析内容质量")
# 获取内容分析器
content_analyzer = pipeline["content_analyzer"]
# 准备笔记数据
notes_data = []
if request.search_result:
# 从搜索结果中提取笔记
from travel_algorithms.web_crawling import XHSNote
for note_dict in request.search_result.get("notes", []):
try:
note = XHSNote(**note_dict)
notes_data.append(note)
except Exception as e:
logger.warning(f"解析笔记失败: {e}")
continue
elif request.notes:
# 直接使用提供的笔记
from travel_algorithms.web_crawling import XHSNote
for note_dict in request.notes:
try:
note = XHSNote(**note_dict)
notes_data.append(note)
except Exception as e:
logger.warning(f"解析笔记失败: {e}")
continue
if not notes_data:
raise ValueError("没有有效的笔记数据可供分析")
# 分析内容
analysis_result = content_analyzer.analyze_xhs_notes(notes_data)
logger.info(f"内容分析完成,质量评分: {analysis_result.content_quality_score:.2f}")
return ContentAnalysisResponse(
success=True,
message="内容分析完成",
data=analysis_result.to_dict()
)
except Exception as e:
error_msg = f"内容分析失败: {str(e)}"
logger.error(error_msg, exc_info=True)
return JSONResponse(
status_code=500,
content=ContentAnalysisResponse(
success=False,
message="内容分析失败",
error=error_msg
).dict()
)
@router.post("/search_and_analyze", response_model=ApiResponse, summary="搜索并分析内容")
async def search_and_analyze(
keyword: str,
max_notes: int = 20,
analyze_keywords: bool = True,
analyze_content: bool = True,
cookies: str = None,
pipeline: Dict[str, Any] = Depends(__import__('api_v2.main', fromlist=['get_crawling_pipeline']).get_crawling_pipeline)
):
"""
一站式搜索并分析内容
- **keyword**: 搜索关键词
- **max_notes**: 最大笔记数量
- **analyze_keywords**: 是否分析关键词
- **analyze_content**: 是否分析内容质量
- **cookies**: Cookie字符串可选
"""
try:
logger.info(f"开始搜索并分析,关键词: {keyword}")
# 获取组件
xhs_crawler = pipeline["xhs_crawler"]
keyword_analyzer = pipeline["keyword_analyzer"] if analyze_keywords else None
content_analyzer = pipeline["content_analyzer"] if analyze_content else None
# 设置Cookies
if cookies:
xhs_crawler.set_cookies(cookies)
# 1. 搜索笔记
from travel_algorithms.web_crawling import SearchConfig
search_config = SearchConfig(
keyword=keyword,
max_notes=max_notes
)
search_result = xhs_crawler.search_notes(search_config)
if not search_result.success:
raise Exception(f"搜索失败: {search_result.error_message}")
result_data = {
"search_result": search_result.to_dict(),
"keyword_analysis": None,
"content_analysis": None
}
# 2. 关键词分析(可选)
if analyze_keywords and keyword_analyzer:
try:
# 合并所有笔记内容进行关键词分析
all_content = " ".join([
f"{note.title} {note.content}" for note in search_result.notes
])
keyword_result = await keyword_analyzer.analyze_content(
content=all_content,
source_type="xhs_notes",
enable_ai_analysis=True
)
result_data["keyword_analysis"] = keyword_result.to_dict()
except Exception as e:
logger.warning(f"关键词分析失败: {e}")
result_data["keyword_analysis"] = {"error": str(e)}
# 3. 内容质量分析(可选)
if analyze_content and content_analyzer and search_result.notes:
try:
content_result = content_analyzer.analyze_xhs_notes(search_result.notes)
result_data["content_analysis"] = content_result.to_dict()
except Exception as e:
logger.warning(f"内容分析失败: {e}")
result_data["content_analysis"] = {"error": str(e)}
logger.info(f"搜索并分析完成,找到 {len(search_result.notes)} 条笔记")
return ApiResponse(
success=True,
message="搜索并分析完成",
data=result_data
)
except Exception as e:
error_msg = f"搜索并分析失败: {str(e)}"
logger.error(error_msg, exc_info=True)
return JSONResponse(
status_code=500,
content=ApiResponse(
success=False,
message="搜索并分析失败",
error=error_msg
).dict()
)
@router.get("/pipeline/stats", response_model=ApiResponse, summary="获取流水线统计")
async def get_pipeline_stats(
pipeline: Dict[str, Any] = Depends(__import__('api_v2.main', fromlist=['get_crawling_pipeline']).get_crawling_pipeline)
):
"""获取爬虫分析流水线的统计信息"""
try:
stats = {
"xhs_crawler": pipeline["xhs_crawler"].get_crawler_stats(),
"keyword_analyzer": pipeline["keyword_analyzer"].get_analyzer_stats(),
"content_analyzer": pipeline["content_analyzer"].get_analyzer_stats(),
"config": {
"web_crawling": pipeline["config"].web_crawling.dict(),
"keyword_analysis": pipeline["config"].keyword_analysis.dict(),
"content_analysis": pipeline["config"].content_analysis.dict()
}
}
return ApiResponse(
success=True,
message="统计信息获取成功",
data=stats
)
except Exception as e:
error_msg = f"获取统计信息失败: {str(e)}"
logger.error(error_msg, exc_info=True)
return JSONResponse(
status_code=500,
content=ApiResponse(
success=False,
message="获取统计信息失败",
error=error_msg
).dict()
)