446 lines
16 KiB
Python
446 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
"""
|
|
内容整合服务
|
|
融合xhs_spider和document模块的主要服务入口
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import asyncio
|
|
from typing import Dict, List, Optional, Any, Tuple
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
import logging
|
|
logger = logging.getLogger(__name__)
|
|
|
|
from .xhs_spider_service import XHSSpiderService, SearchConfig, SearchResult, NoteContent
|
|
from .content_processor import ContentProcessor, ProcessingTask, ProcessedContent
|
|
from .cookie_manager import CookieManager
|
|
from .image_storage_manager import ImageStorageManager
|
|
|
|
@dataclass
|
|
class IntegrationConfig:
|
|
"""整合配置"""
|
|
keyword: str
|
|
document_paths: List[str]
|
|
max_notes: int = 20
|
|
sort_type: int = 0
|
|
note_type: int = 0
|
|
download_media: bool = True
|
|
output_format: str = 'summary'
|
|
include_llm_processing: bool = True
|
|
|
|
@dataclass
|
|
class IntegrationResult:
|
|
"""整合结果"""
|
|
task_id: str
|
|
config: IntegrationConfig
|
|
success: bool
|
|
processed_content: Optional[ProcessedContent] = None
|
|
error_message: str = ""
|
|
processing_time: float = 0.0
|
|
created_time: datetime = field(default_factory=datetime.now)
|
|
|
|
@dataclass
|
|
class ProcessingStats:
|
|
"""处理统计信息"""
|
|
total_tasks: int = 0
|
|
successful_tasks: int = 0
|
|
failed_tasks: int = 0
|
|
total_processing_time: float = 0.0
|
|
average_processing_time: float = 0.0
|
|
total_notes_processed: int = 0
|
|
total_documents_processed: int = 0
|
|
total_media_downloaded: int = 0
|
|
start_time: datetime = field(default_factory=datetime.now)
|
|
last_updated: datetime = field(default_factory=datetime.now)
|
|
|
|
class ContentIntegrationService:
|
|
"""内容整合服务"""
|
|
|
|
def __init__(self,
|
|
cookie_config_path: str = "config/cookies.json",
|
|
media_storage_path: str = "data/media",
|
|
output_path: str = "data/output"):
|
|
|
|
# 初始化服务组件
|
|
self.xhs_service = XHSSpiderService(cookie_config_path, media_storage_path)
|
|
self.content_processor = ContentProcessor(self.xhs_service)
|
|
self.output_path = Path(output_path)
|
|
self.output_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
# 结果存储
|
|
self.integration_results: Dict[str, IntegrationResult] = {}
|
|
|
|
# 统计信息
|
|
self.stats = ProcessingStats()
|
|
|
|
logger.info("内容整合服务初始化完成")
|
|
|
|
def process_content(self,
|
|
keyword: str,
|
|
document_paths: List[str],
|
|
config: Optional[IntegrationConfig] = None) -> IntegrationResult:
|
|
"""处理内容整合"""
|
|
|
|
if config is None:
|
|
config = IntegrationConfig(
|
|
keyword=keyword,
|
|
document_paths=document_paths
|
|
)
|
|
|
|
logger.info(f"开始处理内容整合: {keyword}")
|
|
start_time = datetime.now()
|
|
|
|
# 创建处理任务
|
|
search_config = SearchConfig(
|
|
keyword=keyword,
|
|
max_notes=config.max_notes,
|
|
sort_type=config.sort_type,
|
|
note_type=config.note_type,
|
|
download_images=config.download_media,
|
|
download_videos=config.download_media
|
|
)
|
|
|
|
task_id = self.content_processor.create_processing_task(
|
|
keyword=keyword,
|
|
document_paths=document_paths,
|
|
search_config=search_config
|
|
)
|
|
|
|
# 执行处理
|
|
processed_content = self.content_processor.process_task(task_id)
|
|
end_time = datetime.now()
|
|
processing_time = (end_time - start_time).total_seconds()
|
|
|
|
# 创建结果
|
|
result = IntegrationResult(
|
|
task_id=task_id,
|
|
config=config,
|
|
success=processed_content is not None,
|
|
processed_content=processed_content,
|
|
processing_time=processing_time
|
|
)
|
|
|
|
if processed_content is None:
|
|
result.error_message = "处理失败,请查看日志"
|
|
logger.error(f"内容整合处理失败: {keyword}")
|
|
else:
|
|
logger.info(f"内容整合处理成功: {keyword}, 用时 {processing_time:.2f}s")
|
|
|
|
# 存储结果
|
|
self.integration_results[task_id] = result
|
|
|
|
return result
|
|
|
|
def batch_process_content(self,
|
|
keyword_document_pairs: List[Tuple[str, List[str]]],
|
|
config: Optional[IntegrationConfig] = None) -> Dict[str, IntegrationResult]:
|
|
"""批量处理内容整合"""
|
|
|
|
logger.info(f"开始批量处理内容整合,共 {len(keyword_document_pairs)} 个任务")
|
|
results = {}
|
|
|
|
for keyword, document_paths in keyword_document_pairs:
|
|
result = self.process_content(keyword, document_paths, config)
|
|
results[keyword] = result
|
|
|
|
logger.info(f"批量处理完成,成功 {sum(1 for r in results.values() if r.success)} 个")
|
|
return results
|
|
|
|
def search_xhs_only(self, keyword: str, max_notes: int = 20) -> SearchResult:
|
|
"""仅搜索小红书内容"""
|
|
config = SearchConfig(
|
|
keyword=keyword,
|
|
max_notes=max_notes,
|
|
download_images=True,
|
|
download_videos=True
|
|
)
|
|
|
|
return self.xhs_service.search_notes(config)
|
|
|
|
def process_documents_only(self, document_paths: List[str]) -> Optional[Any]:
|
|
"""仅处理文档内容"""
|
|
try:
|
|
return self.content_processor._process_documents(document_paths)
|
|
except Exception as e:
|
|
logger.error(f"处理文档失败: {str(e)}")
|
|
return None
|
|
|
|
def export_result(self,
|
|
task_id: str,
|
|
output_format: str = 'summary',
|
|
filename: Optional[str] = None) -> Optional[str]:
|
|
"""导出结果"""
|
|
|
|
if task_id not in self.integration_results:
|
|
logger.error(f"结果不存在: {task_id}")
|
|
return None
|
|
|
|
result = self.integration_results[task_id]
|
|
if not result.success or not result.processed_content:
|
|
logger.error(f"结果无效: {task_id}")
|
|
return None
|
|
|
|
# 生成文件名
|
|
if filename is None:
|
|
keyword_safe = "".join(c for c in result.config.keyword if c.isalnum() or c in (' ', '-', '_')).rstrip()
|
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
filename = f"{keyword_safe}_{timestamp}.txt"
|
|
|
|
output_file = self.output_path / filename
|
|
|
|
# 导出
|
|
success = self.content_processor.export_result(task_id, str(output_file), output_format)
|
|
|
|
if success:
|
|
logger.info(f"导出成功: {output_file}")
|
|
return str(output_file)
|
|
else:
|
|
logger.error(f"导出失败: {task_id}")
|
|
return None
|
|
|
|
def get_result(self, task_id: str) -> Optional[IntegrationResult]:
|
|
"""获取结果"""
|
|
return self.integration_results.get(task_id)
|
|
|
|
def list_results(self) -> List[IntegrationResult]:
|
|
"""列出所有结果"""
|
|
return list(self.integration_results.values())
|
|
|
|
def get_service_status(self) -> Dict[str, Any]:
|
|
"""获取服务状态"""
|
|
xhs_stats = self.xhs_service.get_service_statistics()
|
|
|
|
integration_stats = {
|
|
'total_tasks': len(self.integration_results),
|
|
'successful_tasks': sum(1 for r in self.integration_results.values() if r.success),
|
|
'failed_tasks': sum(1 for r in self.integration_results.values() if not r.success),
|
|
'recent_tasks': [
|
|
{
|
|
'task_id': r.task_id,
|
|
'keyword': r.config.keyword,
|
|
'success': r.success,
|
|
'processing_time': r.processing_time,
|
|
'created_time': r.created_time.isoformat()
|
|
}
|
|
for r in sorted(self.integration_results.values(),
|
|
key=lambda x: x.created_time, reverse=True)[:10]
|
|
]
|
|
}
|
|
|
|
return {
|
|
'service_info': {
|
|
'status': 'running',
|
|
'startup_time': datetime.now().isoformat(),
|
|
'output_path': str(self.output_path)
|
|
},
|
|
'xhs_service': xhs_stats,
|
|
'integration_stats': integration_stats
|
|
}
|
|
|
|
def cleanup_old_data(self, days: int = 7) -> Dict[str, int]:
|
|
"""清理旧数据"""
|
|
logger.info(f"开始清理 {days} 天前的数据")
|
|
|
|
# 清理处理任务
|
|
processor_cleanup = self.content_processor.cleanup_old_tasks(days)
|
|
|
|
# 清理存储
|
|
storage_cleanup = self.xhs_service.cleanup_storage()
|
|
|
|
# 清理结果
|
|
cutoff_time = datetime.now() - timedelta(days=days)
|
|
result_count = 0
|
|
|
|
task_ids_to_remove = []
|
|
for task_id, result in self.integration_results.items():
|
|
if result.created_time < cutoff_time:
|
|
task_ids_to_remove.append(task_id)
|
|
|
|
for task_id in task_ids_to_remove:
|
|
del self.integration_results[task_id]
|
|
result_count += 1
|
|
|
|
cleanup_stats = {
|
|
'processor_tasks_removed': processor_cleanup,
|
|
'storage_orphaned_files': storage_cleanup.get('orphaned_files_removed', 0),
|
|
'integration_results_removed': result_count
|
|
}
|
|
|
|
logger.info(f"清理完成: {cleanup_stats}")
|
|
return cleanup_stats
|
|
|
|
def add_cookie(self, name: str, cookie_string: str, user_info: Optional[Dict] = None) -> bool:
|
|
"""添加cookie"""
|
|
return self.xhs_service.add_cookie(name, cookie_string, user_info)
|
|
|
|
def remove_cookie(self, name: str) -> bool:
|
|
"""删除cookie"""
|
|
return self.xhs_service.remove_cookie(name)
|
|
|
|
def get_available_cookies(self) -> List[str]:
|
|
"""获取可用cookies"""
|
|
return self.xhs_service.get_available_cookies()
|
|
|
|
def get_supported_document_formats(self) -> List[str]:
|
|
"""获取支持的文档格式"""
|
|
return self.content_processor.text_extractor.get_supported_formats()
|
|
|
|
def get_available_output_formats(self) -> List[str]:
|
|
"""获取可用的输出格式"""
|
|
return self.content_processor.content_transformer.get_supported_formats()
|
|
|
|
def validate_documents(self, document_paths: List[str]) -> Dict[str, bool]:
|
|
"""验证文档"""
|
|
validation_results = {}
|
|
|
|
for doc_path in document_paths:
|
|
if not os.path.exists(doc_path):
|
|
validation_results[doc_path] = False
|
|
continue
|
|
|
|
try:
|
|
file_ext = Path(doc_path).suffix.lower()
|
|
is_supported = self.content_processor.text_extractor.is_supported(doc_path)
|
|
validation_results[doc_path] = is_supported
|
|
except Exception as e:
|
|
logger.error(f"验证文档失败 {doc_path}: {str(e)}")
|
|
validation_results[doc_path] = False
|
|
|
|
return validation_results
|
|
|
|
def create_sample_config(self) -> IntegrationConfig:
|
|
"""创建示例配置"""
|
|
return IntegrationConfig(
|
|
keyword="示例关键词",
|
|
document_paths=["path/to/document1.pdf", "path/to/document2.docx"],
|
|
max_notes=20,
|
|
sort_type=0,
|
|
note_type=0,
|
|
download_media=True,
|
|
output_format='summary',
|
|
include_llm_processing=True
|
|
)
|
|
|
|
def quick_integration(self,
|
|
keyword: str,
|
|
document_paths: List[str],
|
|
output_format: str = 'summary') -> Optional[str]:
|
|
"""快速整合(一键处理并导出)"""
|
|
|
|
logger.info(f"快速整合: {keyword}")
|
|
|
|
# 验证文档
|
|
validation_results = self.validate_documents(document_paths)
|
|
valid_docs = [path for path, is_valid in validation_results.items() if is_valid]
|
|
|
|
if not valid_docs:
|
|
logger.error("没有有效的文档")
|
|
return None
|
|
|
|
if len(valid_docs) < len(document_paths):
|
|
logger.warning(f"部分文档无效,仅使用 {len(valid_docs)} 个有效文档")
|
|
|
|
# 处理整合
|
|
config = IntegrationConfig(
|
|
keyword=keyword,
|
|
document_paths=valid_docs,
|
|
output_format=output_format
|
|
)
|
|
|
|
result = self.process_content(keyword, valid_docs, config)
|
|
|
|
if not result.success:
|
|
logger.error("整合处理失败")
|
|
return None
|
|
|
|
# 导出结果
|
|
output_file = self.export_result(result.task_id, output_format)
|
|
|
|
if output_file:
|
|
logger.info(f"快速整合完成: {output_file}")
|
|
return output_file
|
|
else:
|
|
logger.error("导出失败")
|
|
return None
|
|
|
|
def get_integration_summary(self, task_id: str) -> Optional[Dict[str, Any]]:
|
|
"""获取整合摘要"""
|
|
result = self.get_result(task_id)
|
|
if not result or not result.processed_content:
|
|
return None
|
|
|
|
content = result.processed_content
|
|
|
|
return {
|
|
'task_id': task_id,
|
|
'keyword': result.config.keyword,
|
|
'success': result.success,
|
|
'processing_time': result.processing_time,
|
|
'created_time': result.created_time.isoformat(),
|
|
'statistics': content.statistics,
|
|
'xhs_summary': {
|
|
'total_notes': content.xhs_content.total_notes,
|
|
'processed_notes': len(content.xhs_content.notes),
|
|
'search_time': content.xhs_content.search_time.isoformat()
|
|
},
|
|
'document_summary': {
|
|
'document_count': content.document_content.document_count,
|
|
'document_types': content.document_content.document_types,
|
|
'total_length': content.document_content.total_content_length,
|
|
'key_topics': content.document_content.key_topics[:5]
|
|
},
|
|
'content_preview': content.get_summary()
|
|
}
|
|
|
|
async def async_process_content(self,
|
|
keyword: str,
|
|
document_paths: List[str],
|
|
config: Optional[IntegrationConfig] = None) -> IntegrationResult:
|
|
"""异步处理内容整合"""
|
|
loop = asyncio.get_event_loop()
|
|
return await loop.run_in_executor(None, self.process_content, keyword, document_paths, config)
|
|
|
|
def get_statistics(self) -> ProcessingStats:
|
|
"""获取处理统计信息"""
|
|
# 更新统计信息
|
|
self.stats.last_updated = datetime.now()
|
|
|
|
# 计算平均处理时间
|
|
if self.stats.total_tasks > 0:
|
|
self.stats.average_processing_time = self.stats.total_processing_time / self.stats.total_tasks
|
|
|
|
return self.stats
|
|
|
|
def update_statistics(self, result: IntegrationResult):
|
|
"""更新统计信息"""
|
|
self.stats.total_tasks += 1
|
|
self.stats.total_processing_time += result.processing_time
|
|
|
|
if result.success:
|
|
self.stats.successful_tasks += 1
|
|
if result.processed_content:
|
|
self.stats.total_notes_processed += len(result.processed_content.xhs_content.notes)
|
|
self.stats.total_documents_processed += result.processed_content.document_content.document_count
|
|
else:
|
|
self.stats.failed_tasks += 1
|
|
|
|
self.stats.last_updated = datetime.now()
|
|
|
|
@property
|
|
def cookie_manager(self) -> CookieManager:
|
|
"""获取Cookie管理器"""
|
|
return self.xhs_service.cookie_manager
|
|
|
|
@property
|
|
def image_manager(self) -> ImageStorageManager:
|
|
"""获取图像存储管理器"""
|
|
return self.xhs_service.image_manager
|
|
|
|
def __del__(self):
|
|
"""清理资源"""
|
|
logger.info("内容整合服务关闭") |