bangbang-aigc-server/core/integration_service.py
2025-07-31 15:35:23 +08:00

446 lines
16 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
内容整合服务
融合xhs_spider和document模块的主要服务入口
"""
import os
import json
import asyncio
from typing import Dict, List, Optional, Any, Tuple
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from pathlib import Path
import logging
logger = logging.getLogger(__name__)
from .xhs_spider_service import XHSSpiderService, SearchConfig, SearchResult, NoteContent
from .content_processor import ContentProcessor, ProcessingTask, ProcessedContent
from .cookie_manager import CookieManager
from .image_storage_manager import ImageStorageManager
@dataclass
class IntegrationConfig:
"""整合配置"""
keyword: str
document_paths: List[str]
max_notes: int = 20
sort_type: int = 0
note_type: int = 0
download_media: bool = True
output_format: str = 'summary'
include_llm_processing: bool = True
@dataclass
class IntegrationResult:
"""整合结果"""
task_id: str
config: IntegrationConfig
success: bool
processed_content: Optional[ProcessedContent] = None
error_message: str = ""
processing_time: float = 0.0
created_time: datetime = field(default_factory=datetime.now)
@dataclass
class ProcessingStats:
"""处理统计信息"""
total_tasks: int = 0
successful_tasks: int = 0
failed_tasks: int = 0
total_processing_time: float = 0.0
average_processing_time: float = 0.0
total_notes_processed: int = 0
total_documents_processed: int = 0
total_media_downloaded: int = 0
start_time: datetime = field(default_factory=datetime.now)
last_updated: datetime = field(default_factory=datetime.now)
class ContentIntegrationService:
"""内容整合服务"""
def __init__(self,
cookie_config_path: str = "config/cookies.json",
media_storage_path: str = "data/media",
output_path: str = "data/output"):
# 初始化服务组件
self.xhs_service = XHSSpiderService(cookie_config_path, media_storage_path)
self.content_processor = ContentProcessor(self.xhs_service)
self.output_path = Path(output_path)
self.output_path.mkdir(parents=True, exist_ok=True)
# 结果存储
self.integration_results: Dict[str, IntegrationResult] = {}
# 统计信息
self.stats = ProcessingStats()
logger.info("内容整合服务初始化完成")
def process_content(self,
keyword: str,
document_paths: List[str],
config: Optional[IntegrationConfig] = None) -> IntegrationResult:
"""处理内容整合"""
if config is None:
config = IntegrationConfig(
keyword=keyword,
document_paths=document_paths
)
logger.info(f"开始处理内容整合: {keyword}")
start_time = datetime.now()
# 创建处理任务
search_config = SearchConfig(
keyword=keyword,
max_notes=config.max_notes,
sort_type=config.sort_type,
note_type=config.note_type,
download_images=config.download_media,
download_videos=config.download_media
)
task_id = self.content_processor.create_processing_task(
keyword=keyword,
document_paths=document_paths,
search_config=search_config
)
# 执行处理
processed_content = self.content_processor.process_task(task_id)
end_time = datetime.now()
processing_time = (end_time - start_time).total_seconds()
# 创建结果
result = IntegrationResult(
task_id=task_id,
config=config,
success=processed_content is not None,
processed_content=processed_content,
processing_time=processing_time
)
if processed_content is None:
result.error_message = "处理失败,请查看日志"
logger.error(f"内容整合处理失败: {keyword}")
else:
logger.info(f"内容整合处理成功: {keyword}, 用时 {processing_time:.2f}s")
# 存储结果
self.integration_results[task_id] = result
return result
def batch_process_content(self,
keyword_document_pairs: List[Tuple[str, List[str]]],
config: Optional[IntegrationConfig] = None) -> Dict[str, IntegrationResult]:
"""批量处理内容整合"""
logger.info(f"开始批量处理内容整合,共 {len(keyword_document_pairs)} 个任务")
results = {}
for keyword, document_paths in keyword_document_pairs:
result = self.process_content(keyword, document_paths, config)
results[keyword] = result
logger.info(f"批量处理完成,成功 {sum(1 for r in results.values() if r.success)}")
return results
def search_xhs_only(self, keyword: str, max_notes: int = 20) -> SearchResult:
"""仅搜索小红书内容"""
config = SearchConfig(
keyword=keyword,
max_notes=max_notes,
download_images=True,
download_videos=True
)
return self.xhs_service.search_notes(config)
def process_documents_only(self, document_paths: List[str]) -> Optional[Any]:
"""仅处理文档内容"""
try:
return self.content_processor._process_documents(document_paths)
except Exception as e:
logger.error(f"处理文档失败: {str(e)}")
return None
def export_result(self,
task_id: str,
output_format: str = 'summary',
filename: Optional[str] = None) -> Optional[str]:
"""导出结果"""
if task_id not in self.integration_results:
logger.error(f"结果不存在: {task_id}")
return None
result = self.integration_results[task_id]
if not result.success or not result.processed_content:
logger.error(f"结果无效: {task_id}")
return None
# 生成文件名
if filename is None:
keyword_safe = "".join(c for c in result.config.keyword if c.isalnum() or c in (' ', '-', '_')).rstrip()
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"{keyword_safe}_{timestamp}.txt"
output_file = self.output_path / filename
# 导出
success = self.content_processor.export_result(task_id, str(output_file), output_format)
if success:
logger.info(f"导出成功: {output_file}")
return str(output_file)
else:
logger.error(f"导出失败: {task_id}")
return None
def get_result(self, task_id: str) -> Optional[IntegrationResult]:
"""获取结果"""
return self.integration_results.get(task_id)
def list_results(self) -> List[IntegrationResult]:
"""列出所有结果"""
return list(self.integration_results.values())
def get_service_status(self) -> Dict[str, Any]:
"""获取服务状态"""
xhs_stats = self.xhs_service.get_service_statistics()
integration_stats = {
'total_tasks': len(self.integration_results),
'successful_tasks': sum(1 for r in self.integration_results.values() if r.success),
'failed_tasks': sum(1 for r in self.integration_results.values() if not r.success),
'recent_tasks': [
{
'task_id': r.task_id,
'keyword': r.config.keyword,
'success': r.success,
'processing_time': r.processing_time,
'created_time': r.created_time.isoformat()
}
for r in sorted(self.integration_results.values(),
key=lambda x: x.created_time, reverse=True)[:10]
]
}
return {
'service_info': {
'status': 'running',
'startup_time': datetime.now().isoformat(),
'output_path': str(self.output_path)
},
'xhs_service': xhs_stats,
'integration_stats': integration_stats
}
def cleanup_old_data(self, days: int = 7) -> Dict[str, int]:
"""清理旧数据"""
logger.info(f"开始清理 {days} 天前的数据")
# 清理处理任务
processor_cleanup = self.content_processor.cleanup_old_tasks(days)
# 清理存储
storage_cleanup = self.xhs_service.cleanup_storage()
# 清理结果
cutoff_time = datetime.now() - timedelta(days=days)
result_count = 0
task_ids_to_remove = []
for task_id, result in self.integration_results.items():
if result.created_time < cutoff_time:
task_ids_to_remove.append(task_id)
for task_id in task_ids_to_remove:
del self.integration_results[task_id]
result_count += 1
cleanup_stats = {
'processor_tasks_removed': processor_cleanup,
'storage_orphaned_files': storage_cleanup.get('orphaned_files_removed', 0),
'integration_results_removed': result_count
}
logger.info(f"清理完成: {cleanup_stats}")
return cleanup_stats
def add_cookie(self, name: str, cookie_string: str, user_info: Optional[Dict] = None) -> bool:
"""添加cookie"""
return self.xhs_service.add_cookie(name, cookie_string, user_info)
def remove_cookie(self, name: str) -> bool:
"""删除cookie"""
return self.xhs_service.remove_cookie(name)
def get_available_cookies(self) -> List[str]:
"""获取可用cookies"""
return self.xhs_service.get_available_cookies()
def get_supported_document_formats(self) -> List[str]:
"""获取支持的文档格式"""
return self.content_processor.text_extractor.get_supported_formats()
def get_available_output_formats(self) -> List[str]:
"""获取可用的输出格式"""
return self.content_processor.content_transformer.get_supported_formats()
def validate_documents(self, document_paths: List[str]) -> Dict[str, bool]:
"""验证文档"""
validation_results = {}
for doc_path in document_paths:
if not os.path.exists(doc_path):
validation_results[doc_path] = False
continue
try:
file_ext = Path(doc_path).suffix.lower()
is_supported = self.content_processor.text_extractor.is_supported(doc_path)
validation_results[doc_path] = is_supported
except Exception as e:
logger.error(f"验证文档失败 {doc_path}: {str(e)}")
validation_results[doc_path] = False
return validation_results
def create_sample_config(self) -> IntegrationConfig:
"""创建示例配置"""
return IntegrationConfig(
keyword="示例关键词",
document_paths=["path/to/document1.pdf", "path/to/document2.docx"],
max_notes=20,
sort_type=0,
note_type=0,
download_media=True,
output_format='summary',
include_llm_processing=True
)
def quick_integration(self,
keyword: str,
document_paths: List[str],
output_format: str = 'summary') -> Optional[str]:
"""快速整合(一键处理并导出)"""
logger.info(f"快速整合: {keyword}")
# 验证文档
validation_results = self.validate_documents(document_paths)
valid_docs = [path for path, is_valid in validation_results.items() if is_valid]
if not valid_docs:
logger.error("没有有效的文档")
return None
if len(valid_docs) < len(document_paths):
logger.warning(f"部分文档无效,仅使用 {len(valid_docs)} 个有效文档")
# 处理整合
config = IntegrationConfig(
keyword=keyword,
document_paths=valid_docs,
output_format=output_format
)
result = self.process_content(keyword, valid_docs, config)
if not result.success:
logger.error("整合处理失败")
return None
# 导出结果
output_file = self.export_result(result.task_id, output_format)
if output_file:
logger.info(f"快速整合完成: {output_file}")
return output_file
else:
logger.error("导出失败")
return None
def get_integration_summary(self, task_id: str) -> Optional[Dict[str, Any]]:
"""获取整合摘要"""
result = self.get_result(task_id)
if not result or not result.processed_content:
return None
content = result.processed_content
return {
'task_id': task_id,
'keyword': result.config.keyword,
'success': result.success,
'processing_time': result.processing_time,
'created_time': result.created_time.isoformat(),
'statistics': content.statistics,
'xhs_summary': {
'total_notes': content.xhs_content.total_notes,
'processed_notes': len(content.xhs_content.notes),
'search_time': content.xhs_content.search_time.isoformat()
},
'document_summary': {
'document_count': content.document_content.document_count,
'document_types': content.document_content.document_types,
'total_length': content.document_content.total_content_length,
'key_topics': content.document_content.key_topics[:5]
},
'content_preview': content.get_summary()
}
async def async_process_content(self,
keyword: str,
document_paths: List[str],
config: Optional[IntegrationConfig] = None) -> IntegrationResult:
"""异步处理内容整合"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self.process_content, keyword, document_paths, config)
def get_statistics(self) -> ProcessingStats:
"""获取处理统计信息"""
# 更新统计信息
self.stats.last_updated = datetime.now()
# 计算平均处理时间
if self.stats.total_tasks > 0:
self.stats.average_processing_time = self.stats.total_processing_time / self.stats.total_tasks
return self.stats
def update_statistics(self, result: IntegrationResult):
"""更新统计信息"""
self.stats.total_tasks += 1
self.stats.total_processing_time += result.processing_time
if result.success:
self.stats.successful_tasks += 1
if result.processed_content:
self.stats.total_notes_processed += len(result.processed_content.xhs_content.notes)
self.stats.total_documents_processed += result.processed_content.document_content.document_count
else:
self.stats.failed_tasks += 1
self.stats.last_updated = datetime.now()
@property
def cookie_manager(self) -> CookieManager:
"""获取Cookie管理器"""
return self.xhs_service.cookie_manager
@property
def image_manager(self) -> ImageStorageManager:
"""获取图像存储管理器"""
return self.xhs_service.image_manager
def __del__(self):
"""清理资源"""
logger.info("内容整合服务关闭")