796 lines
33 KiB
Python
796 lines
33 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
"""
|
||
内容整合服务
|
||
提供小红书内容和文档整合的API服务
|
||
"""
|
||
|
||
import logging
|
||
import uuid
|
||
from typing import Dict, List, Optional, Any, Tuple
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
|
||
from core.models import (
|
||
XHSNote, XHSSearchResult, DocumentContent, IntegratedContent,
|
||
SearchConfig, IntegrationConfig, IntegrationResult, ProcessingStats
|
||
)
|
||
from core.xhs_adapter import XHSAdapter
|
||
from core.document_adapter import DocumentAdapter
|
||
from core.cookie_manager import CookieManager
|
||
|
||
from api.models.integration import (
|
||
XHSSearchRequest, IntegrationRequest, BatchIntegrationRequest,
|
||
CookieManagementRequest, ExportRequest,
|
||
XHSNoteResponse, XHSSearchResponse, DocumentContentResponse,
|
||
IntegratedContentResponse, IntegrationResponse, CookieStatsResponse,
|
||
ServiceStatusResponse, HealthCheckResponse, TaskSummaryResponse,
|
||
ValidationResponse, ApiResponse, BatchSearchRequest, BatchSearchResponse, KeywordSearchResult, NoteInfo
|
||
)
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class IntegrationService:
|
||
"""内容整合服务"""
|
||
|
||
def __init__(self, config_manager=None, output_manager=None):
|
||
"""
|
||
初始化整合服务
|
||
|
||
Args:
|
||
config_manager: 配置管理器
|
||
output_manager: 输出管理器
|
||
"""
|
||
self.config_manager = config_manager
|
||
self.output_manager = output_manager
|
||
|
||
# 初始化核心组件
|
||
self.cookie_manager = CookieManager()
|
||
self.xhs_adapter = None
|
||
self.document_adapter = DocumentAdapter()
|
||
|
||
# 结果存储
|
||
self.integration_results: Dict[str, IntegrationResponse] = {}
|
||
|
||
# 统计信息
|
||
self.stats = ProcessingStats()
|
||
|
||
# 初始化XHS适配器
|
||
self._initialize_xhs_adapter()
|
||
|
||
logger.info("整合服务初始化完成")
|
||
|
||
def _initialize_xhs_adapter(self):
|
||
"""初始化XHS适配器"""
|
||
try:
|
||
# 获取可用的cookie
|
||
cookie_string = self.cookie_manager.get_cookie_string()
|
||
if cookie_string:
|
||
self.xhs_adapter = XHSAdapter(cookie_string)
|
||
logger.info("XHS适配器初始化成功")
|
||
else:
|
||
logger.warning("没有可用的Cookie,XHS适配器未初始化")
|
||
except Exception as e:
|
||
logger.error(f"XHS适配器初始化失败: {e}")
|
||
self.xhs_adapter = None
|
||
|
||
def search_xhs_notes(self, request: XHSSearchRequest) -> XHSSearchResponse:
|
||
"""搜索小红书笔记"""
|
||
start_time = datetime.now()
|
||
|
||
try:
|
||
if not self.xhs_adapter:
|
||
self._initialize_xhs_adapter()
|
||
|
||
if not self.xhs_adapter:
|
||
return XHSSearchResponse(
|
||
keyword=request.keyword,
|
||
notes=[],
|
||
total_count=0,
|
||
success=False,
|
||
error_message="XHS适配器不可用,请检查Cookie配置",
|
||
processing_time=0.0
|
||
)
|
||
|
||
# 创建搜索配置
|
||
config = SearchConfig(
|
||
keyword=request.keyword,
|
||
max_notes=request.max_notes,
|
||
sort_type=request.sort_type,
|
||
note_type=request.note_type,
|
||
download_images=request.download_media,
|
||
download_videos=request.download_media
|
||
)
|
||
|
||
# 执行搜索
|
||
result = self.xhs_adapter.search_notes(config)
|
||
|
||
# 转换为API响应格式
|
||
notes_response = []
|
||
for note in result.notes:
|
||
note_response = XHSNoteResponse(
|
||
note_id=note.note_id,
|
||
title=note.title,
|
||
content=note.content,
|
||
author=note.author,
|
||
author_id=note.author_id,
|
||
tags=note.tags,
|
||
images=note.images,
|
||
videos=note.videos,
|
||
likes=note.likes,
|
||
comments=note.comments,
|
||
shares=note.shares,
|
||
created_time=note.created_time,
|
||
note_url=note.note_url
|
||
)
|
||
notes_response.append(note_response)
|
||
|
||
processing_time = (datetime.now() - start_time).total_seconds()
|
||
|
||
return XHSSearchResponse(
|
||
keyword=result.keyword,
|
||
notes=notes_response,
|
||
total_count=result.total_count,
|
||
success=result.success,
|
||
error_message=result.error_message,
|
||
processing_time=processing_time
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(f"搜索小红书笔记失败: {e}")
|
||
processing_time = (datetime.now() - start_time).total_seconds()
|
||
|
||
return XHSSearchResponse(
|
||
keyword=request.keyword,
|
||
notes=[],
|
||
total_count=0,
|
||
success=False,
|
||
error_message=str(e),
|
||
processing_time=processing_time
|
||
)
|
||
|
||
def integrate_content(self, request: IntegrationRequest) -> IntegrationResponse:
|
||
"""整合内容"""
|
||
start_time = datetime.now()
|
||
task_id = f"integration_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{str(uuid.uuid4())[:8]}"
|
||
|
||
try:
|
||
# 验证文档路径
|
||
validation_results = self.validate_documents(request.document_paths)
|
||
invalid_docs = [path for path, valid in validation_results.items() if not valid]
|
||
|
||
if invalid_docs:
|
||
return IntegrationResponse(
|
||
task_id=task_id,
|
||
keyword=request.keyword,
|
||
success=False,
|
||
error_message=f"以下文档无效: {', '.join(invalid_docs)}",
|
||
processing_time=0.0
|
||
)
|
||
|
||
# 搜索小红书内容
|
||
xhs_request = XHSSearchRequest(
|
||
keyword=request.keyword,
|
||
max_notes=request.max_notes,
|
||
sort_type=request.sort_type,
|
||
note_type=request.note_type,
|
||
download_media=request.download_media
|
||
)
|
||
xhs_result = self.search_xhs_notes(xhs_request)
|
||
|
||
# 处理文档内容
|
||
document_result = self.document_adapter.integrate_documents(request.document_paths)
|
||
|
||
# 转换文档内容为API响应格式
|
||
documents_response = []
|
||
for doc in document_result.documents:
|
||
doc_response = DocumentContentResponse(
|
||
file_path=doc.file_path,
|
||
content=doc.content,
|
||
file_type=doc.file_type,
|
||
metadata=doc.metadata
|
||
)
|
||
documents_response.append(doc_response)
|
||
|
||
integrated_content_response = IntegratedContentResponse(
|
||
documents=documents_response,
|
||
integrated_text=document_result.integrated_text,
|
||
summary=document_result.summary,
|
||
key_topics=document_result.key_topics,
|
||
total_length=document_result.total_length
|
||
)
|
||
|
||
# 生成整合摘要
|
||
integrated_summary = self._generate_integrated_summary(xhs_result, document_result)
|
||
|
||
processing_time = (datetime.now() - start_time).total_seconds()
|
||
|
||
# 创建整合结果
|
||
integration_result = IntegrationResponse(
|
||
task_id=task_id,
|
||
keyword=request.keyword,
|
||
success=True,
|
||
xhs_content=xhs_result,
|
||
document_content=integrated_content_response,
|
||
integrated_summary=integrated_summary,
|
||
processing_time=processing_time,
|
||
created_time=datetime.now()
|
||
)
|
||
|
||
# 存储结果
|
||
self.integration_results[task_id] = integration_result
|
||
|
||
# 更新统计信息
|
||
self._update_stats(integration_result)
|
||
|
||
return integration_result
|
||
|
||
except Exception as e:
|
||
logger.error(f"内容整合失败: {e}")
|
||
processing_time = (datetime.now() - start_time).total_seconds()
|
||
|
||
return IntegrationResponse(
|
||
task_id=task_id,
|
||
keyword=request.keyword,
|
||
success=False,
|
||
error_message=str(e),
|
||
processing_time=processing_time,
|
||
created_time=datetime.now()
|
||
)
|
||
|
||
def batch_integrate_content(self, request: BatchIntegrationRequest) -> Dict[str, IntegrationResponse]:
|
||
"""批量整合内容"""
|
||
logger.info(f"开始批量整合,共 {len(request.tasks)} 个任务")
|
||
|
||
results = {}
|
||
for i, task in enumerate(request.tasks):
|
||
logger.info(f"处理任务 {i+1}/{len(request.tasks)}: {task.keyword}")
|
||
result = self.integrate_content(task)
|
||
results[task.keyword] = result
|
||
|
||
success_count = sum(1 for r in results.values() if r.success)
|
||
logger.info(f"批量整合完成,成功 {success_count}/{len(results)} 个任务")
|
||
|
||
return results
|
||
|
||
def get_integration_result(self, task_id: str) -> Optional[IntegrationResponse]:
|
||
"""获取整合结果"""
|
||
return self.integration_results.get(task_id)
|
||
|
||
def list_integration_results(self) -> List[TaskSummaryResponse]:
|
||
"""列出所有整合结果"""
|
||
results = []
|
||
for task_id, result in self.integration_results.items():
|
||
summary = TaskSummaryResponse(
|
||
task_id=task_id,
|
||
keyword=result.keyword,
|
||
success=result.success,
|
||
processing_time=result.processing_time,
|
||
created_time=result.created_time,
|
||
notes_count=len(result.xhs_content.notes) if result.xhs_content else 0,
|
||
documents_count=len(result.document_content.documents) if result.document_content else 0,
|
||
content_preview=result.integrated_summary[:100] + "..." if len(result.integrated_summary) > 100 else result.integrated_summary
|
||
)
|
||
results.append(summary)
|
||
|
||
return sorted(results, key=lambda x: x.created_time, reverse=True)
|
||
|
||
def export_result(self, request: ExportRequest) -> Optional[str]:
|
||
"""导出结果"""
|
||
result = self.get_integration_result(request.task_id)
|
||
if not result or not result.success:
|
||
return None
|
||
|
||
try:
|
||
# 生成导出内容
|
||
export_content = self._generate_export_content(result, request.output_format)
|
||
|
||
# 生成文件名
|
||
if request.filename:
|
||
filename = request.filename
|
||
else:
|
||
keyword_safe = "".join(c for c in result.keyword if c.isalnum() or c in (' ', '-', '_')).strip()
|
||
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
||
filename = f"{keyword_safe}_{timestamp}.txt"
|
||
|
||
# 保存文件
|
||
if self.output_manager:
|
||
output_path = self.output_manager.save_content(export_content, filename)
|
||
logger.info(f"导出成功: {output_path}")
|
||
return str(output_path)
|
||
else:
|
||
# 默认保存到data/output目录
|
||
output_dir = Path("data/output")
|
||
output_dir.mkdir(parents=True, exist_ok=True)
|
||
output_path = output_dir / filename
|
||
|
||
with open(output_path, 'w', encoding='utf-8') as f:
|
||
f.write(export_content)
|
||
|
||
logger.info(f"导出成功: {output_path}")
|
||
return str(output_path)
|
||
|
||
except Exception as e:
|
||
logger.error(f"导出失败: {e}")
|
||
return None
|
||
|
||
def validate_documents(self, document_paths: List[str]) -> Dict[str, bool]:
|
||
"""验证文档"""
|
||
validation_results = {}
|
||
|
||
for doc_path in document_paths:
|
||
try:
|
||
path = Path(doc_path)
|
||
if not path.exists():
|
||
validation_results[doc_path] = False
|
||
continue
|
||
|
||
# 检查文件类型
|
||
supported_formats = self.document_adapter.get_supported_formats()
|
||
file_ext = path.suffix.lower()
|
||
|
||
validation_results[doc_path] = file_ext in supported_formats
|
||
|
||
except Exception as e:
|
||
logger.error(f"验证文档失败 {doc_path}: {e}")
|
||
validation_results[doc_path] = False
|
||
|
||
return validation_results
|
||
|
||
def add_cookie(self, request: CookieManagementRequest) -> bool:
|
||
"""添加Cookie"""
|
||
try:
|
||
success = self.cookie_manager.add_cookie(
|
||
name=request.name,
|
||
cookie_string=request.cookie_string,
|
||
user_info=request.user_info
|
||
)
|
||
|
||
if success:
|
||
# 重新初始化XHS适配器
|
||
self._initialize_xhs_adapter()
|
||
|
||
return success
|
||
except Exception as e:
|
||
logger.error(f"添加Cookie失败: {e}")
|
||
return False
|
||
|
||
def remove_cookie(self, cookie_name: str) -> bool:
|
||
"""删除Cookie"""
|
||
try:
|
||
success = self.cookie_manager.remove_cookie(cookie_name)
|
||
|
||
if success:
|
||
# 重新初始化XHS适配器
|
||
self._initialize_xhs_adapter()
|
||
|
||
return success
|
||
except Exception as e:
|
||
logger.error(f"删除Cookie失败: {e}")
|
||
return False
|
||
|
||
def get_cookie_stats(self) -> CookieStatsResponse:
|
||
"""获取Cookie统计"""
|
||
stats = self.cookie_manager.get_statistics()
|
||
|
||
return CookieStatsResponse(
|
||
total_cookies=stats.total_cookies,
|
||
valid_cookies=stats.valid_cookies,
|
||
invalid_cookies=stats.invalid_cookies,
|
||
current_cookie=stats.current_cookie,
|
||
cookie_details=stats.cookie_details
|
||
)
|
||
|
||
def get_service_status(self) -> ServiceStatusResponse:
|
||
"""获取服务状态"""
|
||
return ServiceStatusResponse(
|
||
status="running",
|
||
startup_time=datetime.now(), # 这里应该是实际的启动时间
|
||
output_path=str(Path("data/output").absolute()),
|
||
xhs_service_available=self.xhs_adapter is not None and self.xhs_adapter.is_available(),
|
||
document_service_available=self.document_adapter.is_available(),
|
||
integration_stats={
|
||
"total_tasks": len(self.integration_results),
|
||
"successful_tasks": sum(1 for r in self.integration_results.values() if r.success),
|
||
"failed_tasks": sum(1 for r in self.integration_results.values() if not r.success)
|
||
}
|
||
)
|
||
|
||
def get_health_check(self) -> HealthCheckResponse:
|
||
"""获取健康检查"""
|
||
services = {
|
||
"xhs_service": self.xhs_adapter is not None and self.xhs_adapter.is_available(),
|
||
"document_service": self.document_adapter.is_available(),
|
||
"cookie_manager": len(self.cookie_manager.get_valid_cookies()) > 0
|
||
}
|
||
|
||
all_healthy = all(services.values())
|
||
status = "healthy" if all_healthy else "degraded"
|
||
|
||
return HealthCheckResponse(
|
||
status=status,
|
||
services=services
|
||
)
|
||
|
||
def _generate_integrated_summary(self, xhs_result: XHSSearchResponse, document_result: IntegratedContent) -> str:
|
||
"""生成整合摘要"""
|
||
summary_parts = []
|
||
|
||
# 小红书内容摘要
|
||
if xhs_result.success and xhs_result.notes:
|
||
summary_parts.append(f"小红书搜索结果:找到 {len(xhs_result.notes)} 条相关笔记")
|
||
|
||
# 统计点赞数最高的笔记
|
||
top_note = max(xhs_result.notes, key=lambda x: x.likes)
|
||
summary_parts.append(f"最受欢迎的笔记:《{top_note.title}》(点赞 {top_note.likes})")
|
||
|
||
# 文档内容摘要
|
||
if document_result.documents:
|
||
summary_parts.append(f"文档处理结果:处理了 {len(document_result.documents)} 个文档")
|
||
summary_parts.append(f"文档总长度:{document_result.total_length} 字符")
|
||
|
||
if document_result.key_topics:
|
||
summary_parts.append(f"关键主题:{', '.join(document_result.key_topics[:3])}")
|
||
|
||
return "\n".join(summary_parts)
|
||
|
||
def _generate_export_content(self, result: IntegrationResponse, output_format: str) -> str:
|
||
"""生成导出内容"""
|
||
content_parts = []
|
||
|
||
# 添加标题
|
||
content_parts.append(f"# {result.keyword} - 内容整合报告")
|
||
content_parts.append(f"生成时间: {result.created_time.strftime('%Y-%m-%d %H:%M:%S')}")
|
||
content_parts.append(f"任务ID: {result.task_id}")
|
||
content_parts.append("")
|
||
|
||
# 添加摘要
|
||
content_parts.append("## 整合摘要")
|
||
content_parts.append(result.integrated_summary)
|
||
content_parts.append("")
|
||
|
||
# 添加小红书内容
|
||
if result.xhs_content and result.xhs_content.notes:
|
||
content_parts.append("## 小红书内容")
|
||
content_parts.append(f"共找到 {len(result.xhs_content.notes)} 条笔记")
|
||
content_parts.append("")
|
||
|
||
for i, note in enumerate(result.xhs_content.notes[:5], 1): # 只显示前5条
|
||
content_parts.append(f"### {i}. {note.title}")
|
||
content_parts.append(f"作者: {note.author}")
|
||
content_parts.append(f"点赞: {note.likes} | 评论: {note.comments} | 分享: {note.shares}")
|
||
content_parts.append(f"内容: {note.content[:200]}...")
|
||
content_parts.append("")
|
||
|
||
# 添加文档内容
|
||
if result.document_content:
|
||
content_parts.append("## 文档内容")
|
||
content_parts.append(f"处理了 {len(result.document_content.documents)} 个文档")
|
||
content_parts.append("")
|
||
|
||
content_parts.append("### 关键主题")
|
||
content_parts.append(", ".join(result.document_content.key_topics))
|
||
content_parts.append("")
|
||
|
||
content_parts.append("### 文档摘要")
|
||
content_parts.append(result.document_content.summary)
|
||
content_parts.append("")
|
||
|
||
return "\n".join(content_parts)
|
||
|
||
def _update_stats(self, result: IntegrationResponse):
|
||
"""更新统计信息"""
|
||
self.stats.total_tasks += 1
|
||
self.stats.total_processing_time += result.processing_time
|
||
|
||
if result.success:
|
||
self.stats.successful_tasks += 1
|
||
if result.xhs_content:
|
||
self.stats.total_notes_processed += len(result.xhs_content.notes)
|
||
if result.document_content:
|
||
self.stats.total_documents_processed += len(result.document_content.documents)
|
||
else:
|
||
self.stats.failed_tasks += 1
|
||
|
||
if self.stats.total_tasks > 0:
|
||
self.stats.average_processing_time = self.stats.total_processing_time / self.stats.total_tasks
|
||
|
||
self.stats.last_updated = datetime.now()
|
||
|
||
def _parse_cookies(self, cookies_input) -> List[Dict[str, Any]]:
|
||
"""解析不同格式的Cookie输入"""
|
||
import time
|
||
from api.models.integration import CookieConfig
|
||
|
||
parsed_cookies = []
|
||
|
||
if isinstance(cookies_input, str):
|
||
# 方式3: 单一Cookie字符串
|
||
parsed_cookies.append({
|
||
"cookie_string": cookies_input,
|
||
"name": f"cookie_{int(time.time())}",
|
||
"priority": 1
|
||
})
|
||
|
||
elif isinstance(cookies_input, dict):
|
||
# 方式2: 简单键值对,转换为cookie字符串
|
||
cookie_parts = []
|
||
for key, value in cookies_input.items():
|
||
cookie_parts.append(f"{key}={value}")
|
||
cookie_string = "; ".join(cookie_parts)
|
||
|
||
parsed_cookies.append({
|
||
"cookie_string": cookie_string,
|
||
"name": f"dict_cookie_{int(time.time())}",
|
||
"priority": 1
|
||
})
|
||
|
||
elif isinstance(cookies_input, list):
|
||
# 方式1: 详细的Cookie配置列表
|
||
for cookie_config in cookies_input:
|
||
if isinstance(cookie_config, CookieConfig):
|
||
parsed_cookies.append({
|
||
"cookie_string": cookie_config.cookie_string,
|
||
"name": cookie_config.name or f"cookie_{len(parsed_cookies)}",
|
||
"priority": cookie_config.priority,
|
||
"user_info": cookie_config.user_info
|
||
})
|
||
elif isinstance(cookie_config, dict):
|
||
parsed_cookies.append({
|
||
"cookie_string": cookie_config.get("cookie_string", ""),
|
||
"name": cookie_config.get("name", f"cookie_{len(parsed_cookies)}"),
|
||
"priority": cookie_config.get("priority", 1),
|
||
"user_info": cookie_config.get("user_info")
|
||
})
|
||
|
||
# 按优先级排序(高优先级在前)
|
||
parsed_cookies.sort(key=lambda x: x.get("priority", 1), reverse=True)
|
||
|
||
logger.info(f"解析Cookie配置: {len(parsed_cookies)} 个Cookie,优先级从高到低")
|
||
for i, cookie in enumerate(parsed_cookies):
|
||
logger.info(f" Cookie {i+1}: {cookie.get('name')} (优先级: {cookie.get('priority', 1)})")
|
||
|
||
return parsed_cookies
|
||
|
||
def _get_next_cookie(self, cookies_list: List[Dict[str, Any]], used_cookies: set, enable_rotation: bool) -> Optional[Dict[str, Any]]:
|
||
"""获取下一个可用的Cookie"""
|
||
if not enable_rotation:
|
||
# 不启用轮换,使用第一个Cookie
|
||
return cookies_list[0] if cookies_list else None
|
||
|
||
# 启用轮换,找到未使用的Cookie
|
||
for cookie in cookies_list:
|
||
cookie_id = cookie.get("name", "")
|
||
if cookie_id not in used_cookies:
|
||
used_cookies.add(cookie_id)
|
||
return cookie
|
||
|
||
# 如果所有Cookie都用过了,重置并使用第一个
|
||
if cookies_list:
|
||
used_cookies.clear()
|
||
first_cookie = cookies_list[0]
|
||
used_cookies.add(first_cookie.get("name", ""))
|
||
return first_cookie
|
||
|
||
return None
|
||
|
||
|
||
|
||
async def batch_search(self, request: 'BatchSearchRequest') -> 'BatchSearchResponse':
|
||
"""执行批量搜索"""
|
||
import time
|
||
import os
|
||
import requests
|
||
from datetime import datetime
|
||
from api.models.integration import BatchSearchResponse, KeywordSearchResult, NoteInfo
|
||
|
||
start_time = time.time()
|
||
results = []
|
||
total_notes = 0
|
||
successful_searches = 0
|
||
failed_searches = 0
|
||
|
||
# 确保输出目录存在
|
||
os.makedirs(request.output_path, exist_ok=True)
|
||
os.makedirs(request.image_storage_path, exist_ok=True)
|
||
|
||
# 解析Cookie配置
|
||
cookies_list = self._parse_cookies(request.cookies)
|
||
if not cookies_list:
|
||
raise ValueError("未提供有效的Cookie配置")
|
||
|
||
used_cookies = set()
|
||
|
||
logger.info(f"开始批量搜索 {len(request.keywords)} 个关键词")
|
||
logger.info(f"Cookie数量: {len(cookies_list)}")
|
||
logger.info(f"Cookie轮换: {'启用' if request.cookie_rotation else '禁用'}")
|
||
logger.info(f"最大重试次数: {request.max_retries}")
|
||
|
||
for keyword in request.keywords:
|
||
keyword_success = False
|
||
keyword_error = None
|
||
|
||
# 每个关键词重试逻辑
|
||
for attempt in range(request.max_retries):
|
||
try:
|
||
# 获取当前使用的Cookie
|
||
current_cookie = self._get_next_cookie(cookies_list, used_cookies, request.cookie_rotation)
|
||
if not current_cookie:
|
||
raise ValueError("没有可用的Cookie")
|
||
|
||
logger.info(f"搜索关键词: {keyword} (尝试 {attempt + 1}/{request.max_retries}, Cookie: {current_cookie.get('name')})")
|
||
|
||
# 创建XHS适配器实例
|
||
xhs_adapter = XHSAdapter(cookies_str=current_cookie["cookie_string"])
|
||
|
||
# 配置搜索参数
|
||
search_config = SearchConfig(
|
||
keyword=keyword,
|
||
max_notes=request.max_notes_per_keyword,
|
||
sort_type=request.sort_type,
|
||
note_type=request.note_type
|
||
)
|
||
|
||
# 搜索笔记
|
||
search_results = xhs_adapter.search_notes(search_config)
|
||
|
||
# 检查搜索是否成功
|
||
if not search_results.success:
|
||
raise Exception(f"搜索失败: {search_results.error_message}")
|
||
|
||
notes = []
|
||
for note_data in search_results.notes:
|
||
try:
|
||
# 获取详细内容
|
||
note_content = note_data.content or "" # 先使用搜索结果中的content
|
||
if request.get_detailed_content and note_data.note_url:
|
||
try:
|
||
detail_result = xhs_adapter.get_note_info(note_data.note_url)
|
||
if detail_result and detail_result.content:
|
||
note_content = detail_result.content
|
||
except Exception as e:
|
||
logger.warning(f"获取笔记详情失败 {note_data.note_url}: {e}")
|
||
|
||
# 处理图片
|
||
saved_image_paths = []
|
||
if request.save_images and note_data.images:
|
||
for i, img_url in enumerate(note_data.images):
|
||
try:
|
||
# 下载图片
|
||
response = requests.get(img_url, timeout=10)
|
||
if response.status_code == 200:
|
||
# 生成安全的文件名
|
||
safe_keyword = "".join(c for c in keyword if c.isalnum() or c in (' ', '-', '_')).rstrip()[:20]
|
||
safe_title = "".join(c for c in (note_data.title or '')[:20] if c.isalnum() or c in (' ', '-', '_')).rstrip()
|
||
filename = f"{safe_keyword}_{safe_title}_{note_data.note_id}_{i+1}.jpg"
|
||
filepath = os.path.join(request.image_storage_path, filename)
|
||
|
||
with open(filepath, 'wb') as f:
|
||
f.write(response.content)
|
||
saved_image_paths.append(filepath)
|
||
logger.debug(f"保存图片: {filepath}")
|
||
except Exception as e:
|
||
logger.warning(f"下载图片失败 {img_url}: {e}")
|
||
|
||
# 创建笔记信息 - 使用正确的字段映射
|
||
note_info = NoteInfo(
|
||
note_id=note_data.note_id or '',
|
||
title=note_data.title or '',
|
||
author=note_data.author or '',
|
||
content=note_content,
|
||
like_count=note_data.likes or 0,
|
||
comment_count=note_data.comments or 0,
|
||
share_count=note_data.shares or 0,
|
||
collect_count=getattr(note_data, 'collects', 0),
|
||
note_url=note_data.note_url or '',
|
||
images=note_data.images or [],
|
||
saved_image_paths=saved_image_paths,
|
||
publish_time=note_data.created_time,
|
||
location=getattr(note_data, 'location', None),
|
||
tags=note_data.tags or []
|
||
)
|
||
notes.append(note_info)
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理笔记数据失败: {e}")
|
||
continue
|
||
|
||
# 成功获取数据
|
||
keyword_result = KeywordSearchResult(
|
||
keyword=keyword,
|
||
total_notes=len(notes),
|
||
notes=notes,
|
||
search_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
||
success=True,
|
||
error_message=None
|
||
)
|
||
|
||
results.append(keyword_result)
|
||
total_notes += len(notes)
|
||
successful_searches += 1
|
||
keyword_success = True
|
||
|
||
logger.info(f"关键词 '{keyword}' 搜索成功,获得 {len(notes)} 条笔记")
|
||
break # 成功则退出重试循环
|
||
|
||
except Exception as e:
|
||
keyword_error = str(e)
|
||
logger.warning(f"关键词 '{keyword}' 第 {attempt + 1} 次尝试失败: {e}")
|
||
|
||
# 如果还有重试机会且启用了Cookie轮换,尝试下一个Cookie
|
||
if attempt < request.max_retries - 1 and request.cookie_rotation:
|
||
continue
|
||
else:
|
||
break
|
||
|
||
# 如果所有重试都失败了
|
||
if not keyword_success:
|
||
logger.error(f"关键词 '{keyword}' 搜索失败,已用尽所有重试机会")
|
||
|
||
keyword_result = KeywordSearchResult(
|
||
keyword=keyword,
|
||
total_notes=0,
|
||
notes=[],
|
||
search_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
||
success=False,
|
||
error_message=keyword_error or "未知错误"
|
||
)
|
||
results.append(keyword_result)
|
||
failed_searches += 1
|
||
|
||
# 计算处理时间
|
||
processing_time = time.time() - start_time
|
||
|
||
# 生成摘要
|
||
top_authors = []
|
||
total_interactions = 0
|
||
saved_images = 0
|
||
|
||
for result in results:
|
||
if result.success:
|
||
for note in result.notes:
|
||
if note.author and note.author not in top_authors:
|
||
top_authors.append(note.author)
|
||
total_interactions += note.like_count + note.comment_count + note.share_count + note.collect_count
|
||
saved_images += len(note.saved_image_paths)
|
||
|
||
summary = {
|
||
"top_authors": top_authors[:10],
|
||
"total_interactions": total_interactions,
|
||
"saved_images": saved_images,
|
||
"avg_notes_per_keyword": total_notes / len(request.keywords) if request.keywords else 0,
|
||
"search_duration": f"{processing_time:.2f}秒",
|
||
"cookies_used": len(cookies_list),
|
||
"cookie_rotation_enabled": request.cookie_rotation
|
||
}
|
||
|
||
# 保存搜索结果到文件
|
||
output_file = os.path.join(request.output_path, f"batch_search_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json")
|
||
try:
|
||
import json
|
||
with open(output_file, 'w', encoding='utf-8') as f:
|
||
json.dump({
|
||
"total_keywords": len(request.keywords),
|
||
"successful_searches": successful_searches,
|
||
"failed_searches": failed_searches,
|
||
"total_notes": total_notes,
|
||
"results": [result.dict() for result in results],
|
||
"summary": summary
|
||
}, f, ensure_ascii=False, indent=2)
|
||
logger.info(f"搜索结果已保存到: {output_file}")
|
||
except Exception as e:
|
||
logger.error(f"保存搜索结果失败: {e}")
|
||
|
||
return BatchSearchResponse(
|
||
total_keywords=len(request.keywords),
|
||
successful_searches=successful_searches,
|
||
failed_searches=failed_searches,
|
||
total_notes=total_notes,
|
||
results=results,
|
||
image_storage_path=request.image_storage_path,
|
||
output_path=request.output_path,
|
||
processing_time=processing_time,
|
||
summary=summary
|
||
) |