404 lines
14 KiB
Python
404 lines
14 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
"""
|
||
Web Crawling Router
|
||
爬虫分析路由 - API v2
|
||
"""
|
||
|
||
import logging
|
||
from typing import Dict, Any, List
|
||
from fastapi import APIRouter, Depends, HTTPException
|
||
from fastapi.responses import JSONResponse
|
||
|
||
from ..models import (
|
||
XHSSearchRequest,
|
||
KeywordAnalysisRequest,
|
||
ContentAnalysisRequest,
|
||
XHSSearchResponse,
|
||
KeywordAnalysisResponse,
|
||
ContentAnalysisResponse,
|
||
ApiResponse
|
||
)
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
router = APIRouter()
|
||
|
||
|
||
@router.post("/xhs/search", response_model=XHSSearchResponse, summary="搜索小红书笔记")
|
||
async def search_xhs_notes(
|
||
request: XHSSearchRequest,
|
||
pipeline: Dict[str, Any] = Depends(__import__('api_v2.main', fromlist=['get_crawling_pipeline']).get_crawling_pipeline)
|
||
):
|
||
"""
|
||
搜索小红书笔记
|
||
|
||
- **keyword**: 搜索关键词
|
||
- **max_notes**: 最大笔记数量 (1-100)
|
||
- **sort_type**: 排序类型 (0:综合, 1:最新, 2:最多点赞, 3:最多评论, 4:最多收藏)
|
||
- **note_type**: 笔记类型 (0:不限, 1:视频笔记, 2:普通笔记)
|
||
- **cookies**: Cookie字符串(可选)
|
||
"""
|
||
try:
|
||
logger.info(f"开始搜索小红书笔记,关键词: {request.keyword}")
|
||
|
||
# 获取小红书爬虫
|
||
xhs_crawler = pipeline["xhs_crawler"]
|
||
|
||
# 设置Cookies(如果提供)
|
||
if request.cookies:
|
||
xhs_crawler.set_cookies(request.cookies)
|
||
|
||
# 创建搜索配置
|
||
from travel_algorithms.web_crawling import SearchConfig
|
||
search_config = SearchConfig(
|
||
keyword=request.keyword,
|
||
max_notes=request.max_notes,
|
||
sort_type=request.sort_type,
|
||
note_type=request.note_type
|
||
)
|
||
|
||
# 执行搜索
|
||
search_result = xhs_crawler.search_notes(search_config)
|
||
|
||
logger.info(f"搜索完成,找到 {len(search_result.notes)} 条笔记")
|
||
|
||
return XHSSearchResponse(
|
||
success=search_result.success,
|
||
message="搜索完成" if search_result.success else "搜索失败",
|
||
data=search_result.to_dict(),
|
||
error=search_result.error_message if not search_result.success else None
|
||
)
|
||
|
||
except Exception as e:
|
||
error_msg = f"搜索失败: {str(e)}"
|
||
logger.error(error_msg, exc_info=True)
|
||
|
||
return JSONResponse(
|
||
status_code=500,
|
||
content=XHSSearchResponse(
|
||
success=False,
|
||
message="搜索失败",
|
||
error=error_msg
|
||
).dict()
|
||
)
|
||
|
||
|
||
@router.post("/xhs/batch_search", response_model=ApiResponse, summary="批量搜索关键词")
|
||
async def batch_search_xhs(
|
||
keywords: List[str],
|
||
max_notes_per_keyword: int = 10,
|
||
cookies: str = None,
|
||
pipeline: Dict[str, Any] = Depends(__import__('api_v2.main', fromlist=['get_crawling_pipeline']).get_crawling_pipeline)
|
||
):
|
||
"""
|
||
批量搜索多个关键词
|
||
|
||
- **keywords**: 关键词列表
|
||
- **max_notes_per_keyword**: 每个关键词的最大笔记数
|
||
- **cookies**: Cookie字符串(可选)
|
||
"""
|
||
try:
|
||
logger.info(f"开始批量搜索,关键词数量: {len(keywords)}")
|
||
|
||
# 获取小红书爬虫
|
||
xhs_crawler = pipeline["xhs_crawler"]
|
||
|
||
# 设置Cookies(如果提供)
|
||
if cookies:
|
||
xhs_crawler.set_cookies(cookies)
|
||
|
||
# 批量搜索
|
||
results = xhs_crawler.batch_search(keywords, max_notes_per_keyword)
|
||
|
||
# 统计结果
|
||
total_notes = sum(len(result.notes) for result in results.values())
|
||
success_count = sum(1 for result in results.values() if result.success)
|
||
|
||
batch_data = {
|
||
"results": {k: v.to_dict() for k, v in results.items()},
|
||
"statistics": {
|
||
"total_keywords": len(keywords),
|
||
"successful_searches": success_count,
|
||
"total_notes_found": total_notes,
|
||
"success_rate": success_count / len(keywords) if keywords else 0
|
||
}
|
||
}
|
||
|
||
logger.info(f"批量搜索完成,成功 {success_count}/{len(keywords)} 个关键词")
|
||
|
||
return ApiResponse(
|
||
success=True,
|
||
message=f"批量搜索完成,找到 {total_notes} 条笔记",
|
||
data=batch_data
|
||
)
|
||
|
||
except Exception as e:
|
||
error_msg = f"批量搜索失败: {str(e)}"
|
||
logger.error(error_msg, exc_info=True)
|
||
|
||
return JSONResponse(
|
||
status_code=500,
|
||
content=ApiResponse(
|
||
success=False,
|
||
message="批量搜索失败",
|
||
error=error_msg
|
||
).dict()
|
||
)
|
||
|
||
|
||
@router.post("/keywords/analyze", response_model=KeywordAnalysisResponse, summary="分析关键词")
|
||
async def analyze_keywords(
|
||
request: KeywordAnalysisRequest,
|
||
pipeline: Dict[str, Any] = Depends(__import__('api_v2.main', fromlist=['get_crawling_pipeline']).get_crawling_pipeline)
|
||
):
|
||
"""
|
||
分析内容中的关键词
|
||
|
||
- **content**: 待分析的内容
|
||
- **source_type**: 内容源类型
|
||
- **enable_ai_analysis**: 是否启用AI分析
|
||
"""
|
||
try:
|
||
logger.info(f"开始分析关键词,内容长度: {len(request.content)}")
|
||
|
||
# 获取关键词分析器
|
||
keyword_analyzer = pipeline["keyword_analyzer"]
|
||
|
||
# 分析关键词
|
||
analysis_result = await keyword_analyzer.analyze_content(
|
||
content=request.content,
|
||
source_type=request.source_type,
|
||
enable_ai_analysis=request.enable_ai_analysis
|
||
)
|
||
|
||
logger.info(f"关键词分析完成,提取到 {len(analysis_result.keywords)} 个关键词")
|
||
|
||
return KeywordAnalysisResponse(
|
||
success=True,
|
||
message="关键词分析完成",
|
||
data=analysis_result.to_dict()
|
||
)
|
||
|
||
except Exception as e:
|
||
error_msg = f"关键词分析失败: {str(e)}"
|
||
logger.error(error_msg, exc_info=True)
|
||
|
||
return JSONResponse(
|
||
status_code=500,
|
||
content=KeywordAnalysisResponse(
|
||
success=False,
|
||
message="关键词分析失败",
|
||
error=error_msg
|
||
).dict()
|
||
)
|
||
|
||
|
||
@router.post("/content/analyze", response_model=ContentAnalysisResponse, summary="分析内容质量")
|
||
async def analyze_content(
|
||
request: ContentAnalysisRequest,
|
||
pipeline: Dict[str, Any] = Depends(__import__('api_v2.main', fromlist=['get_crawling_pipeline']).get_crawling_pipeline)
|
||
):
|
||
"""
|
||
分析内容质量和特征
|
||
|
||
- **search_result**: 搜索结果(可选)
|
||
- **notes**: 笔记列表(可选)
|
||
|
||
注意:search_result和notes至少需要提供一个
|
||
"""
|
||
try:
|
||
logger.info("开始分析内容质量")
|
||
|
||
# 获取内容分析器
|
||
content_analyzer = pipeline["content_analyzer"]
|
||
|
||
# 准备笔记数据
|
||
notes_data = []
|
||
|
||
if request.search_result:
|
||
# 从搜索结果中提取笔记
|
||
from travel_algorithms.web_crawling import XHSNote
|
||
for note_dict in request.search_result.get("notes", []):
|
||
try:
|
||
note = XHSNote(**note_dict)
|
||
notes_data.append(note)
|
||
except Exception as e:
|
||
logger.warning(f"解析笔记失败: {e}")
|
||
continue
|
||
|
||
elif request.notes:
|
||
# 直接使用提供的笔记
|
||
from travel_algorithms.web_crawling import XHSNote
|
||
for note_dict in request.notes:
|
||
try:
|
||
note = XHSNote(**note_dict)
|
||
notes_data.append(note)
|
||
except Exception as e:
|
||
logger.warning(f"解析笔记失败: {e}")
|
||
continue
|
||
|
||
if not notes_data:
|
||
raise ValueError("没有有效的笔记数据可供分析")
|
||
|
||
# 分析内容
|
||
analysis_result = content_analyzer.analyze_xhs_notes(notes_data)
|
||
|
||
logger.info(f"内容分析完成,质量评分: {analysis_result.content_quality_score:.2f}")
|
||
|
||
return ContentAnalysisResponse(
|
||
success=True,
|
||
message="内容分析完成",
|
||
data=analysis_result.to_dict()
|
||
)
|
||
|
||
except Exception as e:
|
||
error_msg = f"内容分析失败: {str(e)}"
|
||
logger.error(error_msg, exc_info=True)
|
||
|
||
return JSONResponse(
|
||
status_code=500,
|
||
content=ContentAnalysisResponse(
|
||
success=False,
|
||
message="内容分析失败",
|
||
error=error_msg
|
||
).dict()
|
||
)
|
||
|
||
|
||
@router.post("/search_and_analyze", response_model=ApiResponse, summary="搜索并分析内容")
|
||
async def search_and_analyze(
|
||
keyword: str,
|
||
max_notes: int = 20,
|
||
analyze_keywords: bool = True,
|
||
analyze_content: bool = True,
|
||
cookies: str = None,
|
||
pipeline: Dict[str, Any] = Depends(__import__('api_v2.main', fromlist=['get_crawling_pipeline']).get_crawling_pipeline)
|
||
):
|
||
"""
|
||
一站式搜索并分析内容
|
||
|
||
- **keyword**: 搜索关键词
|
||
- **max_notes**: 最大笔记数量
|
||
- **analyze_keywords**: 是否分析关键词
|
||
- **analyze_content**: 是否分析内容质量
|
||
- **cookies**: Cookie字符串(可选)
|
||
"""
|
||
try:
|
||
logger.info(f"开始搜索并分析,关键词: {keyword}")
|
||
|
||
# 获取组件
|
||
xhs_crawler = pipeline["xhs_crawler"]
|
||
keyword_analyzer = pipeline["keyword_analyzer"] if analyze_keywords else None
|
||
content_analyzer = pipeline["content_analyzer"] if analyze_content else None
|
||
|
||
# 设置Cookies
|
||
if cookies:
|
||
xhs_crawler.set_cookies(cookies)
|
||
|
||
# 1. 搜索笔记
|
||
from travel_algorithms.web_crawling import SearchConfig
|
||
search_config = SearchConfig(
|
||
keyword=keyword,
|
||
max_notes=max_notes
|
||
)
|
||
|
||
search_result = xhs_crawler.search_notes(search_config)
|
||
|
||
if not search_result.success:
|
||
raise Exception(f"搜索失败: {search_result.error_message}")
|
||
|
||
result_data = {
|
||
"search_result": search_result.to_dict(),
|
||
"keyword_analysis": None,
|
||
"content_analysis": None
|
||
}
|
||
|
||
# 2. 关键词分析(可选)
|
||
if analyze_keywords and keyword_analyzer:
|
||
try:
|
||
# 合并所有笔记内容进行关键词分析
|
||
all_content = " ".join([
|
||
f"{note.title} {note.content}" for note in search_result.notes
|
||
])
|
||
|
||
keyword_result = await keyword_analyzer.analyze_content(
|
||
content=all_content,
|
||
source_type="xhs_notes",
|
||
enable_ai_analysis=True
|
||
)
|
||
|
||
result_data["keyword_analysis"] = keyword_result.to_dict()
|
||
|
||
except Exception as e:
|
||
logger.warning(f"关键词分析失败: {e}")
|
||
result_data["keyword_analysis"] = {"error": str(e)}
|
||
|
||
# 3. 内容质量分析(可选)
|
||
if analyze_content and content_analyzer and search_result.notes:
|
||
try:
|
||
content_result = content_analyzer.analyze_xhs_notes(search_result.notes)
|
||
result_data["content_analysis"] = content_result.to_dict()
|
||
|
||
except Exception as e:
|
||
logger.warning(f"内容分析失败: {e}")
|
||
result_data["content_analysis"] = {"error": str(e)}
|
||
|
||
logger.info(f"搜索并分析完成,找到 {len(search_result.notes)} 条笔记")
|
||
|
||
return ApiResponse(
|
||
success=True,
|
||
message="搜索并分析完成",
|
||
data=result_data
|
||
)
|
||
|
||
except Exception as e:
|
||
error_msg = f"搜索并分析失败: {str(e)}"
|
||
logger.error(error_msg, exc_info=True)
|
||
|
||
return JSONResponse(
|
||
status_code=500,
|
||
content=ApiResponse(
|
||
success=False,
|
||
message="搜索并分析失败",
|
||
error=error_msg
|
||
).dict()
|
||
)
|
||
|
||
|
||
@router.get("/pipeline/stats", response_model=ApiResponse, summary="获取流水线统计")
|
||
async def get_pipeline_stats(
|
||
pipeline: Dict[str, Any] = Depends(__import__('api_v2.main', fromlist=['get_crawling_pipeline']).get_crawling_pipeline)
|
||
):
|
||
"""获取爬虫分析流水线的统计信息"""
|
||
try:
|
||
stats = {
|
||
"xhs_crawler": pipeline["xhs_crawler"].get_crawler_stats(),
|
||
"keyword_analyzer": pipeline["keyword_analyzer"].get_analyzer_stats(),
|
||
"content_analyzer": pipeline["content_analyzer"].get_analyzer_stats(),
|
||
"config": {
|
||
"web_crawling": pipeline["config"].web_crawling.dict(),
|
||
"keyword_analysis": pipeline["config"].keyword_analysis.dict(),
|
||
"content_analysis": pipeline["config"].content_analysis.dict()
|
||
}
|
||
}
|
||
|
||
return ApiResponse(
|
||
success=True,
|
||
message="统计信息获取成功",
|
||
data=stats
|
||
)
|
||
|
||
except Exception as e:
|
||
error_msg = f"获取统计信息失败: {str(e)}"
|
||
logger.error(error_msg, exc_info=True)
|
||
|
||
return JSONResponse(
|
||
status_code=500,
|
||
content=ApiResponse(
|
||
success=False,
|
||
message="获取统计信息失败",
|
||
error=error_msg
|
||
).dict()
|
||
) |