TravelContentCreator/api/services/integration_service.py

796 lines
33 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
内容整合服务
提供小红书内容和文档整合的API服务
"""
import logging
import uuid
from typing import Dict, List, Optional, Any, Tuple
from datetime import datetime
from pathlib import Path
from core.models import (
XHSNote, XHSSearchResult, DocumentContent, IntegratedContent,
SearchConfig, IntegrationConfig, IntegrationResult, ProcessingStats
)
from core.xhs_adapter import XHSAdapter
from core.document_adapter import DocumentAdapter
from core.cookie_manager import CookieManager
from api.models.integration import (
XHSSearchRequest, IntegrationRequest, BatchIntegrationRequest,
CookieManagementRequest, ExportRequest,
XHSNoteResponse, XHSSearchResponse, DocumentContentResponse,
IntegratedContentResponse, IntegrationResponse, CookieStatsResponse,
ServiceStatusResponse, HealthCheckResponse, TaskSummaryResponse,
ValidationResponse, ApiResponse, BatchSearchRequest, BatchSearchResponse, KeywordSearchResult, NoteInfo
)
logger = logging.getLogger(__name__)
class IntegrationService:
"""内容整合服务"""
def __init__(self, config_manager=None, output_manager=None):
"""
初始化整合服务
Args:
config_manager: 配置管理器
output_manager: 输出管理器
"""
self.config_manager = config_manager
self.output_manager = output_manager
# 初始化核心组件
self.cookie_manager = CookieManager()
self.xhs_adapter = None
self.document_adapter = DocumentAdapter()
# 结果存储
self.integration_results: Dict[str, IntegrationResponse] = {}
# 统计信息
self.stats = ProcessingStats()
# 初始化XHS适配器
self._initialize_xhs_adapter()
logger.info("整合服务初始化完成")
def _initialize_xhs_adapter(self):
"""初始化XHS适配器"""
try:
# 获取可用的cookie
cookie_string = self.cookie_manager.get_cookie_string()
if cookie_string:
self.xhs_adapter = XHSAdapter(cookie_string)
logger.info("XHS适配器初始化成功")
else:
logger.warning("没有可用的CookieXHS适配器未初始化")
except Exception as e:
logger.error(f"XHS适配器初始化失败: {e}")
self.xhs_adapter = None
def search_xhs_notes(self, request: XHSSearchRequest) -> XHSSearchResponse:
"""搜索小红书笔记"""
start_time = datetime.now()
try:
if not self.xhs_adapter:
self._initialize_xhs_adapter()
if not self.xhs_adapter:
return XHSSearchResponse(
keyword=request.keyword,
notes=[],
total_count=0,
success=False,
error_message="XHS适配器不可用请检查Cookie配置",
processing_time=0.0
)
# 创建搜索配置
config = SearchConfig(
keyword=request.keyword,
max_notes=request.max_notes,
sort_type=request.sort_type,
note_type=request.note_type,
download_images=request.download_media,
download_videos=request.download_media
)
# 执行搜索
result = self.xhs_adapter.search_notes(config)
# 转换为API响应格式
notes_response = []
for note in result.notes:
note_response = XHSNoteResponse(
note_id=note.note_id,
title=note.title,
content=note.content,
author=note.author,
author_id=note.author_id,
tags=note.tags,
images=note.images,
videos=note.videos,
likes=note.likes,
comments=note.comments,
shares=note.shares,
created_time=note.created_time,
note_url=note.note_url
)
notes_response.append(note_response)
processing_time = (datetime.now() - start_time).total_seconds()
return XHSSearchResponse(
keyword=result.keyword,
notes=notes_response,
total_count=result.total_count,
success=result.success,
error_message=result.error_message,
processing_time=processing_time
)
except Exception as e:
logger.error(f"搜索小红书笔记失败: {e}")
processing_time = (datetime.now() - start_time).total_seconds()
return XHSSearchResponse(
keyword=request.keyword,
notes=[],
total_count=0,
success=False,
error_message=str(e),
processing_time=processing_time
)
def integrate_content(self, request: IntegrationRequest) -> IntegrationResponse:
"""整合内容"""
start_time = datetime.now()
task_id = f"integration_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{str(uuid.uuid4())[:8]}"
try:
# 验证文档路径
validation_results = self.validate_documents(request.document_paths)
invalid_docs = [path for path, valid in validation_results.items() if not valid]
if invalid_docs:
return IntegrationResponse(
task_id=task_id,
keyword=request.keyword,
success=False,
error_message=f"以下文档无效: {', '.join(invalid_docs)}",
processing_time=0.0
)
# 搜索小红书内容
xhs_request = XHSSearchRequest(
keyword=request.keyword,
max_notes=request.max_notes,
sort_type=request.sort_type,
note_type=request.note_type,
download_media=request.download_media
)
xhs_result = self.search_xhs_notes(xhs_request)
# 处理文档内容
document_result = self.document_adapter.integrate_documents(request.document_paths)
# 转换文档内容为API响应格式
documents_response = []
for doc in document_result.documents:
doc_response = DocumentContentResponse(
file_path=doc.file_path,
content=doc.content,
file_type=doc.file_type,
metadata=doc.metadata
)
documents_response.append(doc_response)
integrated_content_response = IntegratedContentResponse(
documents=documents_response,
integrated_text=document_result.integrated_text,
summary=document_result.summary,
key_topics=document_result.key_topics,
total_length=document_result.total_length
)
# 生成整合摘要
integrated_summary = self._generate_integrated_summary(xhs_result, document_result)
processing_time = (datetime.now() - start_time).total_seconds()
# 创建整合结果
integration_result = IntegrationResponse(
task_id=task_id,
keyword=request.keyword,
success=True,
xhs_content=xhs_result,
document_content=integrated_content_response,
integrated_summary=integrated_summary,
processing_time=processing_time,
created_time=datetime.now()
)
# 存储结果
self.integration_results[task_id] = integration_result
# 更新统计信息
self._update_stats(integration_result)
return integration_result
except Exception as e:
logger.error(f"内容整合失败: {e}")
processing_time = (datetime.now() - start_time).total_seconds()
return IntegrationResponse(
task_id=task_id,
keyword=request.keyword,
success=False,
error_message=str(e),
processing_time=processing_time,
created_time=datetime.now()
)
def batch_integrate_content(self, request: BatchIntegrationRequest) -> Dict[str, IntegrationResponse]:
"""批量整合内容"""
logger.info(f"开始批量整合,共 {len(request.tasks)} 个任务")
results = {}
for i, task in enumerate(request.tasks):
logger.info(f"处理任务 {i+1}/{len(request.tasks)}: {task.keyword}")
result = self.integrate_content(task)
results[task.keyword] = result
success_count = sum(1 for r in results.values() if r.success)
logger.info(f"批量整合完成,成功 {success_count}/{len(results)} 个任务")
return results
def get_integration_result(self, task_id: str) -> Optional[IntegrationResponse]:
"""获取整合结果"""
return self.integration_results.get(task_id)
def list_integration_results(self) -> List[TaskSummaryResponse]:
"""列出所有整合结果"""
results = []
for task_id, result in self.integration_results.items():
summary = TaskSummaryResponse(
task_id=task_id,
keyword=result.keyword,
success=result.success,
processing_time=result.processing_time,
created_time=result.created_time,
notes_count=len(result.xhs_content.notes) if result.xhs_content else 0,
documents_count=len(result.document_content.documents) if result.document_content else 0,
content_preview=result.integrated_summary[:100] + "..." if len(result.integrated_summary) > 100 else result.integrated_summary
)
results.append(summary)
return sorted(results, key=lambda x: x.created_time, reverse=True)
def export_result(self, request: ExportRequest) -> Optional[str]:
"""导出结果"""
result = self.get_integration_result(request.task_id)
if not result or not result.success:
return None
try:
# 生成导出内容
export_content = self._generate_export_content(result, request.output_format)
# 生成文件名
if request.filename:
filename = request.filename
else:
keyword_safe = "".join(c for c in result.keyword if c.isalnum() or c in (' ', '-', '_')).strip()
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"{keyword_safe}_{timestamp}.txt"
# 保存文件
if self.output_manager:
output_path = self.output_manager.save_content(export_content, filename)
logger.info(f"导出成功: {output_path}")
return str(output_path)
else:
# 默认保存到data/output目录
output_dir = Path("data/output")
output_dir.mkdir(parents=True, exist_ok=True)
output_path = output_dir / filename
with open(output_path, 'w', encoding='utf-8') as f:
f.write(export_content)
logger.info(f"导出成功: {output_path}")
return str(output_path)
except Exception as e:
logger.error(f"导出失败: {e}")
return None
def validate_documents(self, document_paths: List[str]) -> Dict[str, bool]:
"""验证文档"""
validation_results = {}
for doc_path in document_paths:
try:
path = Path(doc_path)
if not path.exists():
validation_results[doc_path] = False
continue
# 检查文件类型
supported_formats = self.document_adapter.get_supported_formats()
file_ext = path.suffix.lower()
validation_results[doc_path] = file_ext in supported_formats
except Exception as e:
logger.error(f"验证文档失败 {doc_path}: {e}")
validation_results[doc_path] = False
return validation_results
def add_cookie(self, request: CookieManagementRequest) -> bool:
"""添加Cookie"""
try:
success = self.cookie_manager.add_cookie(
name=request.name,
cookie_string=request.cookie_string,
user_info=request.user_info
)
if success:
# 重新初始化XHS适配器
self._initialize_xhs_adapter()
return success
except Exception as e:
logger.error(f"添加Cookie失败: {e}")
return False
def remove_cookie(self, cookie_name: str) -> bool:
"""删除Cookie"""
try:
success = self.cookie_manager.remove_cookie(cookie_name)
if success:
# 重新初始化XHS适配器
self._initialize_xhs_adapter()
return success
except Exception as e:
logger.error(f"删除Cookie失败: {e}")
return False
def get_cookie_stats(self) -> CookieStatsResponse:
"""获取Cookie统计"""
stats = self.cookie_manager.get_statistics()
return CookieStatsResponse(
total_cookies=stats.total_cookies,
valid_cookies=stats.valid_cookies,
invalid_cookies=stats.invalid_cookies,
current_cookie=stats.current_cookie,
cookie_details=stats.cookie_details
)
def get_service_status(self) -> ServiceStatusResponse:
"""获取服务状态"""
return ServiceStatusResponse(
status="running",
startup_time=datetime.now(), # 这里应该是实际的启动时间
output_path=str(Path("data/output").absolute()),
xhs_service_available=self.xhs_adapter is not None and self.xhs_adapter.is_available(),
document_service_available=self.document_adapter.is_available(),
integration_stats={
"total_tasks": len(self.integration_results),
"successful_tasks": sum(1 for r in self.integration_results.values() if r.success),
"failed_tasks": sum(1 for r in self.integration_results.values() if not r.success)
}
)
def get_health_check(self) -> HealthCheckResponse:
"""获取健康检查"""
services = {
"xhs_service": self.xhs_adapter is not None and self.xhs_adapter.is_available(),
"document_service": self.document_adapter.is_available(),
"cookie_manager": len(self.cookie_manager.get_valid_cookies()) > 0
}
all_healthy = all(services.values())
status = "healthy" if all_healthy else "degraded"
return HealthCheckResponse(
status=status,
services=services
)
def _generate_integrated_summary(self, xhs_result: XHSSearchResponse, document_result: IntegratedContent) -> str:
"""生成整合摘要"""
summary_parts = []
# 小红书内容摘要
if xhs_result.success and xhs_result.notes:
summary_parts.append(f"小红书搜索结果:找到 {len(xhs_result.notes)} 条相关笔记")
# 统计点赞数最高的笔记
top_note = max(xhs_result.notes, key=lambda x: x.likes)
summary_parts.append(f"最受欢迎的笔记:《{top_note.title}》(点赞 {top_note.likes})")
# 文档内容摘要
if document_result.documents:
summary_parts.append(f"文档处理结果:处理了 {len(document_result.documents)} 个文档")
summary_parts.append(f"文档总长度:{document_result.total_length} 字符")
if document_result.key_topics:
summary_parts.append(f"关键主题:{', '.join(document_result.key_topics[:3])}")
return "\n".join(summary_parts)
def _generate_export_content(self, result: IntegrationResponse, output_format: str) -> str:
"""生成导出内容"""
content_parts = []
# 添加标题
content_parts.append(f"# {result.keyword} - 内容整合报告")
content_parts.append(f"生成时间: {result.created_time.strftime('%Y-%m-%d %H:%M:%S')}")
content_parts.append(f"任务ID: {result.task_id}")
content_parts.append("")
# 添加摘要
content_parts.append("## 整合摘要")
content_parts.append(result.integrated_summary)
content_parts.append("")
# 添加小红书内容
if result.xhs_content and result.xhs_content.notes:
content_parts.append("## 小红书内容")
content_parts.append(f"共找到 {len(result.xhs_content.notes)} 条笔记")
content_parts.append("")
for i, note in enumerate(result.xhs_content.notes[:5], 1): # 只显示前5条
content_parts.append(f"### {i}. {note.title}")
content_parts.append(f"作者: {note.author}")
content_parts.append(f"点赞: {note.likes} | 评论: {note.comments} | 分享: {note.shares}")
content_parts.append(f"内容: {note.content[:200]}...")
content_parts.append("")
# 添加文档内容
if result.document_content:
content_parts.append("## 文档内容")
content_parts.append(f"处理了 {len(result.document_content.documents)} 个文档")
content_parts.append("")
content_parts.append("### 关键主题")
content_parts.append(", ".join(result.document_content.key_topics))
content_parts.append("")
content_parts.append("### 文档摘要")
content_parts.append(result.document_content.summary)
content_parts.append("")
return "\n".join(content_parts)
def _update_stats(self, result: IntegrationResponse):
"""更新统计信息"""
self.stats.total_tasks += 1
self.stats.total_processing_time += result.processing_time
if result.success:
self.stats.successful_tasks += 1
if result.xhs_content:
self.stats.total_notes_processed += len(result.xhs_content.notes)
if result.document_content:
self.stats.total_documents_processed += len(result.document_content.documents)
else:
self.stats.failed_tasks += 1
if self.stats.total_tasks > 0:
self.stats.average_processing_time = self.stats.total_processing_time / self.stats.total_tasks
self.stats.last_updated = datetime.now()
def _parse_cookies(self, cookies_input) -> List[Dict[str, Any]]:
"""解析不同格式的Cookie输入"""
import time
from api.models.integration import CookieConfig
parsed_cookies = []
if isinstance(cookies_input, str):
# 方式3: 单一Cookie字符串
parsed_cookies.append({
"cookie_string": cookies_input,
"name": f"cookie_{int(time.time())}",
"priority": 1
})
elif isinstance(cookies_input, dict):
# 方式2: 简单键值对转换为cookie字符串
cookie_parts = []
for key, value in cookies_input.items():
cookie_parts.append(f"{key}={value}")
cookie_string = "; ".join(cookie_parts)
parsed_cookies.append({
"cookie_string": cookie_string,
"name": f"dict_cookie_{int(time.time())}",
"priority": 1
})
elif isinstance(cookies_input, list):
# 方式1: 详细的Cookie配置列表
for cookie_config in cookies_input:
if isinstance(cookie_config, CookieConfig):
parsed_cookies.append({
"cookie_string": cookie_config.cookie_string,
"name": cookie_config.name or f"cookie_{len(parsed_cookies)}",
"priority": cookie_config.priority,
"user_info": cookie_config.user_info
})
elif isinstance(cookie_config, dict):
parsed_cookies.append({
"cookie_string": cookie_config.get("cookie_string", ""),
"name": cookie_config.get("name", f"cookie_{len(parsed_cookies)}"),
"priority": cookie_config.get("priority", 1),
"user_info": cookie_config.get("user_info")
})
# 按优先级排序(高优先级在前)
parsed_cookies.sort(key=lambda x: x.get("priority", 1), reverse=True)
logger.info(f"解析Cookie配置: {len(parsed_cookies)} 个Cookie优先级从高到低")
for i, cookie in enumerate(parsed_cookies):
logger.info(f" Cookie {i+1}: {cookie.get('name')} (优先级: {cookie.get('priority', 1)})")
return parsed_cookies
def _get_next_cookie(self, cookies_list: List[Dict[str, Any]], used_cookies: set, enable_rotation: bool) -> Optional[Dict[str, Any]]:
"""获取下一个可用的Cookie"""
if not enable_rotation:
# 不启用轮换使用第一个Cookie
return cookies_list[0] if cookies_list else None
# 启用轮换找到未使用的Cookie
for cookie in cookies_list:
cookie_id = cookie.get("name", "")
if cookie_id not in used_cookies:
used_cookies.add(cookie_id)
return cookie
# 如果所有Cookie都用过了重置并使用第一个
if cookies_list:
used_cookies.clear()
first_cookie = cookies_list[0]
used_cookies.add(first_cookie.get("name", ""))
return first_cookie
return None
async def batch_search(self, request: 'BatchSearchRequest') -> 'BatchSearchResponse':
"""执行批量搜索"""
import time
import os
import requests
from datetime import datetime
from api.models.integration import BatchSearchResponse, KeywordSearchResult, NoteInfo
start_time = time.time()
results = []
total_notes = 0
successful_searches = 0
failed_searches = 0
# 确保输出目录存在
os.makedirs(request.output_path, exist_ok=True)
os.makedirs(request.image_storage_path, exist_ok=True)
# 解析Cookie配置
cookies_list = self._parse_cookies(request.cookies)
if not cookies_list:
raise ValueError("未提供有效的Cookie配置")
used_cookies = set()
logger.info(f"开始批量搜索 {len(request.keywords)} 个关键词")
logger.info(f"Cookie数量: {len(cookies_list)}")
logger.info(f"Cookie轮换: {'启用' if request.cookie_rotation else '禁用'}")
logger.info(f"最大重试次数: {request.max_retries}")
for keyword in request.keywords:
keyword_success = False
keyword_error = None
# 每个关键词重试逻辑
for attempt in range(request.max_retries):
try:
# 获取当前使用的Cookie
current_cookie = self._get_next_cookie(cookies_list, used_cookies, request.cookie_rotation)
if not current_cookie:
raise ValueError("没有可用的Cookie")
logger.info(f"搜索关键词: {keyword} (尝试 {attempt + 1}/{request.max_retries}, Cookie: {current_cookie.get('name')})")
# 创建XHS适配器实例
xhs_adapter = XHSAdapter(cookies_str=current_cookie["cookie_string"])
# 配置搜索参数
search_config = SearchConfig(
keyword=keyword,
max_notes=request.max_notes_per_keyword,
sort_type=request.sort_type,
note_type=request.note_type
)
# 搜索笔记
search_results = xhs_adapter.search_notes(search_config)
# 检查搜索是否成功
if not search_results.success:
raise Exception(f"搜索失败: {search_results.error_message}")
notes = []
for note_data in search_results.notes:
try:
# 获取详细内容
note_content = note_data.content or "" # 先使用搜索结果中的content
if request.get_detailed_content and note_data.note_url:
try:
detail_result = xhs_adapter.get_note_info(note_data.note_url)
if detail_result and detail_result.content:
note_content = detail_result.content
except Exception as e:
logger.warning(f"获取笔记详情失败 {note_data.note_url}: {e}")
# 处理图片
saved_image_paths = []
if request.save_images and note_data.images:
for i, img_url in enumerate(note_data.images):
try:
# 下载图片
response = requests.get(img_url, timeout=10)
if response.status_code == 200:
# 生成安全的文件名
safe_keyword = "".join(c for c in keyword if c.isalnum() or c in (' ', '-', '_')).rstrip()[:20]
safe_title = "".join(c for c in (note_data.title or '')[:20] if c.isalnum() or c in (' ', '-', '_')).rstrip()
filename = f"{safe_keyword}_{safe_title}_{note_data.note_id}_{i+1}.jpg"
filepath = os.path.join(request.image_storage_path, filename)
with open(filepath, 'wb') as f:
f.write(response.content)
saved_image_paths.append(filepath)
logger.debug(f"保存图片: {filepath}")
except Exception as e:
logger.warning(f"下载图片失败 {img_url}: {e}")
# 创建笔记信息 - 使用正确的字段映射
note_info = NoteInfo(
note_id=note_data.note_id or '',
title=note_data.title or '',
author=note_data.author or '',
content=note_content,
like_count=note_data.likes or 0,
comment_count=note_data.comments or 0,
share_count=note_data.shares or 0,
collect_count=getattr(note_data, 'collects', 0),
note_url=note_data.note_url or '',
images=note_data.images or [],
saved_image_paths=saved_image_paths,
publish_time=note_data.created_time,
location=getattr(note_data, 'location', None),
tags=note_data.tags or []
)
notes.append(note_info)
except Exception as e:
logger.error(f"处理笔记数据失败: {e}")
continue
# 成功获取数据
keyword_result = KeywordSearchResult(
keyword=keyword,
total_notes=len(notes),
notes=notes,
search_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
success=True,
error_message=None
)
results.append(keyword_result)
total_notes += len(notes)
successful_searches += 1
keyword_success = True
logger.info(f"关键词 '{keyword}' 搜索成功,获得 {len(notes)} 条笔记")
break # 成功则退出重试循环
except Exception as e:
keyword_error = str(e)
logger.warning(f"关键词 '{keyword}'{attempt + 1} 次尝试失败: {e}")
# 如果还有重试机会且启用了Cookie轮换尝试下一个Cookie
if attempt < request.max_retries - 1 and request.cookie_rotation:
continue
else:
break
# 如果所有重试都失败了
if not keyword_success:
logger.error(f"关键词 '{keyword}' 搜索失败,已用尽所有重试机会")
keyword_result = KeywordSearchResult(
keyword=keyword,
total_notes=0,
notes=[],
search_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
success=False,
error_message=keyword_error or "未知错误"
)
results.append(keyword_result)
failed_searches += 1
# 计算处理时间
processing_time = time.time() - start_time
# 生成摘要
top_authors = []
total_interactions = 0
saved_images = 0
for result in results:
if result.success:
for note in result.notes:
if note.author and note.author not in top_authors:
top_authors.append(note.author)
total_interactions += note.like_count + note.comment_count + note.share_count + note.collect_count
saved_images += len(note.saved_image_paths)
summary = {
"top_authors": top_authors[:10],
"total_interactions": total_interactions,
"saved_images": saved_images,
"avg_notes_per_keyword": total_notes / len(request.keywords) if request.keywords else 0,
"search_duration": f"{processing_time:.2f}",
"cookies_used": len(cookies_list),
"cookie_rotation_enabled": request.cookie_rotation
}
# 保存搜索结果到文件
output_file = os.path.join(request.output_path, f"batch_search_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json")
try:
import json
with open(output_file, 'w', encoding='utf-8') as f:
json.dump({
"total_keywords": len(request.keywords),
"successful_searches": successful_searches,
"failed_searches": failed_searches,
"total_notes": total_notes,
"results": [result.dict() for result in results],
"summary": summary
}, f, ensure_ascii=False, indent=2)
logger.info(f"搜索结果已保存到: {output_file}")
except Exception as e:
logger.error(f"保存搜索结果失败: {e}")
return BatchSearchResponse(
total_keywords=len(request.keywords),
successful_searches=successful_searches,
failed_searches=failed_searches,
total_notes=total_notes,
results=results,
image_storage_path=request.image_storage_path,
output_path=request.output_path,
processing_time=processing_time,
summary=summary
)