438 lines
18 KiB
Python
438 lines
18 KiB
Python
|
|
#!/usr/bin/env python3
|
|||
|
|
# -*- coding: utf-8 -*-
|
|||
|
|
|
|||
|
|
"""
|
|||
|
|
内容处理器
|
|||
|
|
整合小红书内容和文档内容,为LLM处理做准备
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import os
|
|||
|
|
import json
|
|||
|
|
from typing import Dict, List, Optional, Any, Tuple
|
|||
|
|
from dataclasses import dataclass, field
|
|||
|
|
from datetime import datetime, timedelta
|
|||
|
|
from pathlib import Path
|
|||
|
|
import logging
|
|||
|
|
logger = logging.getLogger(__name__)
|
|||
|
|
|
|||
|
|
from document import TextExtractor, ContentIntegrator, ContentTransformer
|
|||
|
|
from document.text_extractor import ExtractedDocument
|
|||
|
|
from document.content_integrator import IntegratedContent
|
|||
|
|
from document.content_transformer import TransformedContent
|
|||
|
|
|
|||
|
|
from .xhs_spider_service import XHSSpiderService, SearchConfig, SearchResult, NoteContent
|
|||
|
|
|
|||
|
|
@dataclass
|
|||
|
|
class ProcessingTask:
|
|||
|
|
"""处理任务"""
|
|||
|
|
task_id: str
|
|||
|
|
keyword: str
|
|||
|
|
document_paths: List[str]
|
|||
|
|
search_config: SearchConfig
|
|||
|
|
processing_options: Dict[str, Any] = field(default_factory=dict)
|
|||
|
|
created_time: datetime = field(default_factory=datetime.now)
|
|||
|
|
status: str = "pending" # pending, processing, completed, failed
|
|||
|
|
|
|||
|
|
@dataclass
|
|||
|
|
class ProcessedContent:
|
|||
|
|
"""处理后的内容"""
|
|||
|
|
task_id: str
|
|||
|
|
keyword: str
|
|||
|
|
xhs_content: SearchResult
|
|||
|
|
document_content: IntegratedContent
|
|||
|
|
combined_content: str
|
|||
|
|
processed_time: datetime
|
|||
|
|
statistics: Dict[str, Any]
|
|||
|
|
|
|||
|
|
def get_full_content(self) -> str:
|
|||
|
|
"""获取完整内容"""
|
|||
|
|
return self.combined_content
|
|||
|
|
|
|||
|
|
def get_summary(self) -> str:
|
|||
|
|
"""获取摘要"""
|
|||
|
|
summary_parts = []
|
|||
|
|
summary_parts.append(f"关键词: {self.keyword}")
|
|||
|
|
summary_parts.append(f"处理时间: {self.processed_time}")
|
|||
|
|
summary_parts.append(f"小红书笔记: {self.statistics.get('xhs_notes_count', 0)} 篇")
|
|||
|
|
summary_parts.append(f"文档数量: {self.statistics.get('document_count', 0)} 个")
|
|||
|
|
summary_parts.append(f"总内容长度: {self.statistics.get('total_content_length', 0)} 字符")
|
|||
|
|
|
|||
|
|
return "\n".join(summary_parts)
|
|||
|
|
|
|||
|
|
class ContentProcessor:
|
|||
|
|
"""内容处理器"""
|
|||
|
|
|
|||
|
|
def __init__(self,
|
|||
|
|
xhs_service: Optional[XHSSpiderService] = None,
|
|||
|
|
text_extractor: Optional[TextExtractor] = None,
|
|||
|
|
content_integrator: Optional[ContentIntegrator] = None,
|
|||
|
|
content_transformer: Optional[ContentTransformer] = None):
|
|||
|
|
|
|||
|
|
# 初始化服务组件
|
|||
|
|
self.xhs_service = xhs_service or XHSSpiderService()
|
|||
|
|
self.text_extractor = text_extractor or TextExtractor()
|
|||
|
|
self.content_integrator = content_integrator or ContentIntegrator()
|
|||
|
|
self.content_transformer = content_transformer or ContentTransformer()
|
|||
|
|
|
|||
|
|
# 任务管理
|
|||
|
|
self.processing_tasks: Dict[str, ProcessingTask] = {}
|
|||
|
|
self.processed_results: Dict[str, ProcessedContent] = {}
|
|||
|
|
|
|||
|
|
def create_processing_task(self,
|
|||
|
|
keyword: str,
|
|||
|
|
document_paths: List[str],
|
|||
|
|
search_config: Optional[SearchConfig] = None,
|
|||
|
|
processing_options: Optional[Dict[str, Any]] = None) -> str:
|
|||
|
|
"""创建处理任务"""
|
|||
|
|
|
|||
|
|
task_id = f"task_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{hash(keyword) % 10000}"
|
|||
|
|
|
|||
|
|
if search_config is None:
|
|||
|
|
search_config = SearchConfig(
|
|||
|
|
keyword=keyword,
|
|||
|
|
max_notes=20,
|
|||
|
|
download_images=True,
|
|||
|
|
download_videos=True
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
task = ProcessingTask(
|
|||
|
|
task_id=task_id,
|
|||
|
|
keyword=keyword,
|
|||
|
|
document_paths=document_paths,
|
|||
|
|
search_config=search_config,
|
|||
|
|
processing_options=processing_options or {}
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
self.processing_tasks[task_id] = task
|
|||
|
|
logger.info(f"创建处理任务: {task_id}, 关键词: {keyword}")
|
|||
|
|
|
|||
|
|
return task_id
|
|||
|
|
|
|||
|
|
def process_task(self, task_id: str) -> Optional[ProcessedContent]:
|
|||
|
|
"""处理任务"""
|
|||
|
|
if task_id not in self.processing_tasks:
|
|||
|
|
logger.error(f"任务不存在: {task_id}")
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
task = self.processing_tasks[task_id]
|
|||
|
|
task.status = "processing"
|
|||
|
|
|
|||
|
|
logger.info(f"开始处理任务: {task_id}")
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# 1. 搜索小红书内容
|
|||
|
|
logger.info(f"搜索小红书内容: {task.keyword}")
|
|||
|
|
xhs_result = self.xhs_service.search_notes(task.search_config)
|
|||
|
|
|
|||
|
|
if not xhs_result.success:
|
|||
|
|
logger.error(f"小红书搜索失败: {xhs_result.error_message}")
|
|||
|
|
task.status = "failed"
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
# 2. 处理文档内容
|
|||
|
|
logger.info(f"处理文档内容: {len(task.document_paths)} 个文档")
|
|||
|
|
document_content = self._process_documents(task.document_paths)
|
|||
|
|
|
|||
|
|
# 3. 整合内容
|
|||
|
|
logger.info("整合内容")
|
|||
|
|
combined_content = self._combine_content(xhs_result, document_content, task)
|
|||
|
|
|
|||
|
|
# 4. 生成统计信息
|
|||
|
|
statistics = self._generate_statistics(xhs_result, document_content, combined_content)
|
|||
|
|
|
|||
|
|
# 5. 创建处理结果
|
|||
|
|
processed_result = ProcessedContent(
|
|||
|
|
task_id=task_id,
|
|||
|
|
keyword=task.keyword,
|
|||
|
|
xhs_content=xhs_result,
|
|||
|
|
document_content=document_content,
|
|||
|
|
combined_content=combined_content,
|
|||
|
|
processed_time=datetime.now(),
|
|||
|
|
statistics=statistics
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
self.processed_results[task_id] = processed_result
|
|||
|
|
task.status = "completed"
|
|||
|
|
|
|||
|
|
logger.info(f"任务处理完成: {task_id}")
|
|||
|
|
return processed_result
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"处理任务出错: {str(e)}")
|
|||
|
|
task.status = "failed"
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
def _process_documents(self, document_paths: List[str]) -> IntegratedContent:
|
|||
|
|
"""处理文档"""
|
|||
|
|
extracted_docs = []
|
|||
|
|
|
|||
|
|
for doc_path in document_paths:
|
|||
|
|
if not os.path.exists(doc_path):
|
|||
|
|
logger.warning(f"文档不存在: {doc_path}")
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
extracted_doc = self.text_extractor.extract(doc_path)
|
|||
|
|
extracted_docs.append(extracted_doc)
|
|||
|
|
logger.info(f"成功提取文档: {doc_path}")
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"提取文档失败 {doc_path}: {str(e)}")
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
# 整合文档内容
|
|||
|
|
integrated_content = self.content_integrator.integrate_documents(extracted_docs)
|
|||
|
|
return integrated_content
|
|||
|
|
|
|||
|
|
def _combine_content(self,
|
|||
|
|
xhs_result: SearchResult,
|
|||
|
|
document_content: IntegratedContent,
|
|||
|
|
task: ProcessingTask) -> str:
|
|||
|
|
"""整合小红书内容和文档内容"""
|
|||
|
|
|
|||
|
|
content_parts = []
|
|||
|
|
|
|||
|
|
# 1. 任务信息
|
|||
|
|
content_parts.append("=" * 80)
|
|||
|
|
content_parts.append(f"内容整合报告")
|
|||
|
|
content_parts.append("=" * 80)
|
|||
|
|
content_parts.append(f"关键词: {task.keyword}")
|
|||
|
|
content_parts.append(f"处理时间: {datetime.now()}")
|
|||
|
|
content_parts.append(f"任务ID: {task.task_id}")
|
|||
|
|
content_parts.append("")
|
|||
|
|
|
|||
|
|
# 2. 小红书内容
|
|||
|
|
content_parts.append("🔍 小红书内容")
|
|||
|
|
content_parts.append("=" * 50)
|
|||
|
|
content_parts.append(f"搜索结果: {xhs_result.total_notes} 篇笔记")
|
|||
|
|
content_parts.append(f"成功处理: {len(xhs_result.notes)} 篇")
|
|||
|
|
content_parts.append("")
|
|||
|
|
|
|||
|
|
for i, note in enumerate(xhs_result.notes, 1):
|
|||
|
|
content_parts.append(f"📝 笔记 {i}: {note.title}")
|
|||
|
|
content_parts.append(f"作者: {note.user_name}")
|
|||
|
|
content_parts.append(f"类型: {note.note_type}")
|
|||
|
|
content_parts.append(f"发布时间: {note.created_time}")
|
|||
|
|
content_parts.append(f"位置: {note.location}")
|
|||
|
|
content_parts.append(f"互动数据: 👍{note.stats.get('liked_count', 0)} 💖{note.stats.get('collected_count', 0)} 💬{note.stats.get('comment_count', 0)}")
|
|||
|
|
content_parts.append("")
|
|||
|
|
content_parts.append("内容:")
|
|||
|
|
content_parts.append(note.content)
|
|||
|
|
content_parts.append("")
|
|||
|
|
if note.tags:
|
|||
|
|
content_parts.append(f"标签: {', '.join(note.tags)}")
|
|||
|
|
content_parts.append("")
|
|||
|
|
content_parts.append("-" * 40)
|
|||
|
|
content_parts.append("")
|
|||
|
|
|
|||
|
|
# 3. 文档内容
|
|||
|
|
content_parts.append("📄 文档内容")
|
|||
|
|
content_parts.append("=" * 50)
|
|||
|
|
content_parts.append(f"文档数量: {document_content.document_count}")
|
|||
|
|
content_parts.append(f"文档类型: {document_content.document_types}")
|
|||
|
|
content_parts.append(f"总内容长度: {document_content.total_content_length} 字符")
|
|||
|
|
content_parts.append("")
|
|||
|
|
content_parts.append("文档摘要:")
|
|||
|
|
content_parts.append(document_content.content_summary)
|
|||
|
|
content_parts.append("")
|
|||
|
|
if document_content.key_topics:
|
|||
|
|
content_parts.append(f"关键主题: {', '.join(document_content.key_topics)}")
|
|||
|
|
content_parts.append("")
|
|||
|
|
content_parts.append("完整文档内容:")
|
|||
|
|
content_parts.append(document_content.combined_content)
|
|||
|
|
content_parts.append("")
|
|||
|
|
|
|||
|
|
# 4. 整合分析
|
|||
|
|
content_parts.append("🔄 整合分析")
|
|||
|
|
content_parts.append("=" * 50)
|
|||
|
|
content_parts.append(f"整合时间: {datetime.now()}")
|
|||
|
|
content_parts.append(f"数据来源: 小红书 + 文档资料")
|
|||
|
|
content_parts.append(f"内容特点: 结合了实时用户反馈和专业资料")
|
|||
|
|
content_parts.append("")
|
|||
|
|
|
|||
|
|
# 5. 关键信息提取
|
|||
|
|
xhs_keywords = self._extract_xhs_keywords(xhs_result.notes)
|
|||
|
|
doc_keywords = document_content.key_topics
|
|||
|
|
|
|||
|
|
content_parts.append("🔑 关键信息提取")
|
|||
|
|
content_parts.append("-" * 30)
|
|||
|
|
content_parts.append(f"小红书热门话题: {', '.join(xhs_keywords[:10])}")
|
|||
|
|
content_parts.append(f"文档关键主题: {', '.join(doc_keywords[:10])}")
|
|||
|
|
content_parts.append("")
|
|||
|
|
|
|||
|
|
# 6. 内容建议
|
|||
|
|
content_parts.append("💡 内容建议")
|
|||
|
|
content_parts.append("-" * 30)
|
|||
|
|
content_parts.append("基于小红书用户反馈和专业文档,建议关注以下要点:")
|
|||
|
|
|
|||
|
|
# 简单的建议生成逻辑
|
|||
|
|
suggestions = self._generate_suggestions(xhs_result, document_content)
|
|||
|
|
for suggestion in suggestions:
|
|||
|
|
content_parts.append(f"• {suggestion}")
|
|||
|
|
|
|||
|
|
content_parts.append("")
|
|||
|
|
content_parts.append("=" * 80)
|
|||
|
|
|
|||
|
|
return "\n".join(content_parts)
|
|||
|
|
|
|||
|
|
def _extract_xhs_keywords(self, notes: List[NoteContent]) -> List[str]:
|
|||
|
|
"""从小红书笔记中提取关键词"""
|
|||
|
|
keyword_count = {}
|
|||
|
|
|
|||
|
|
for note in notes:
|
|||
|
|
# 提取标签
|
|||
|
|
for tag in note.tags:
|
|||
|
|
keyword_count[tag] = keyword_count.get(tag, 0) + 1
|
|||
|
|
|
|||
|
|
# 简单的关键词提取(可以改进)
|
|||
|
|
words = []
|
|||
|
|
if note.title:
|
|||
|
|
words.extend(note.title.split())
|
|||
|
|
if note.content:
|
|||
|
|
words.extend(note.content.split())
|
|||
|
|
|
|||
|
|
for word in words:
|
|||
|
|
if len(word) >= 2 and word.isalpha():
|
|||
|
|
keyword_count[word] = keyword_count.get(word, 0) + 1
|
|||
|
|
|
|||
|
|
# 返回频次最高的关键词
|
|||
|
|
sorted_keywords = sorted(keyword_count.items(), key=lambda x: x[1], reverse=True)
|
|||
|
|
return [keyword for keyword, count in sorted_keywords if count > 1]
|
|||
|
|
|
|||
|
|
def _generate_suggestions(self,
|
|||
|
|
xhs_result: SearchResult,
|
|||
|
|
document_content: IntegratedContent) -> List[str]:
|
|||
|
|
"""生成内容建议"""
|
|||
|
|
suggestions = []
|
|||
|
|
|
|||
|
|
# 基于小红书数据的建议
|
|||
|
|
if xhs_result.notes:
|
|||
|
|
high_engagement_notes = [note for note in xhs_result.notes
|
|||
|
|
if note.stats.get('liked_count', 0) > 100]
|
|||
|
|
if high_engagement_notes:
|
|||
|
|
suggestions.append(f"高互动笔记共 {len(high_engagement_notes)} 篇,关注用户喜好")
|
|||
|
|
|
|||
|
|
video_notes = [note for note in xhs_result.notes if note.note_type == '视频']
|
|||
|
|
if video_notes:
|
|||
|
|
suggestions.append(f"视频内容占比 {len(video_notes)}/{len(xhs_result.notes)},视频形式更受欢迎")
|
|||
|
|
|
|||
|
|
# 基于文档内容的建议
|
|||
|
|
if document_content.document_count > 0:
|
|||
|
|
suggestions.append(f"结合 {document_content.document_count} 份专业文档,确保内容权威性")
|
|||
|
|
|
|||
|
|
if document_content.key_topics:
|
|||
|
|
suggestions.append(f"重点关注专业主题: {', '.join(document_content.key_topics[:3])}")
|
|||
|
|
|
|||
|
|
# 通用建议
|
|||
|
|
suggestions.append("融合用户真实体验和专业知识,提高内容可信度")
|
|||
|
|
suggestions.append("关注用户关心的实际问题和解决方案")
|
|||
|
|
|
|||
|
|
return suggestions
|
|||
|
|
|
|||
|
|
def _generate_statistics(self,
|
|||
|
|
xhs_result: SearchResult,
|
|||
|
|
document_content: IntegratedContent,
|
|||
|
|
combined_content: str) -> Dict[str, Any]:
|
|||
|
|
"""生成统计信息"""
|
|||
|
|
stats = {
|
|||
|
|
'xhs_notes_count': len(xhs_result.notes),
|
|||
|
|
'xhs_total_found': xhs_result.total_notes,
|
|||
|
|
'document_count': document_content.document_count,
|
|||
|
|
'document_types': document_content.document_types,
|
|||
|
|
'total_content_length': len(combined_content),
|
|||
|
|
'xhs_content_length': len(xhs_result.get_all_text_content()),
|
|||
|
|
'document_content_length': document_content.total_content_length,
|
|||
|
|
'processing_time': datetime.now().isoformat()
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
# 小红书统计
|
|||
|
|
if xhs_result.notes:
|
|||
|
|
total_likes = sum(note.stats.get('liked_count', 0) for note in xhs_result.notes)
|
|||
|
|
total_comments = sum(note.stats.get('comment_count', 0) for note in xhs_result.notes)
|
|||
|
|
total_collections = sum(note.stats.get('collected_count', 0) for note in xhs_result.notes)
|
|||
|
|
|
|||
|
|
stats.update({
|
|||
|
|
'xhs_total_likes': total_likes,
|
|||
|
|
'xhs_total_comments': total_comments,
|
|||
|
|
'xhs_total_collections': total_collections,
|
|||
|
|
'xhs_avg_likes': total_likes / len(xhs_result.notes),
|
|||
|
|
'xhs_video_count': len([n for n in xhs_result.notes if n.note_type == '视频']),
|
|||
|
|
'xhs_image_count': len([n for n in xhs_result.notes if n.note_type == '图集'])
|
|||
|
|
})
|
|||
|
|
|
|||
|
|
return stats
|
|||
|
|
|
|||
|
|
def get_processed_result(self, task_id: str) -> Optional[ProcessedContent]:
|
|||
|
|
"""获取处理结果"""
|
|||
|
|
return self.processed_results.get(task_id)
|
|||
|
|
|
|||
|
|
def get_task_status(self, task_id: str) -> Optional[str]:
|
|||
|
|
"""获取任务状态"""
|
|||
|
|
task = self.processing_tasks.get(task_id)
|
|||
|
|
return task.status if task else None
|
|||
|
|
|
|||
|
|
def list_tasks(self) -> List[ProcessingTask]:
|
|||
|
|
"""列出所有任务"""
|
|||
|
|
return list(self.processing_tasks.values())
|
|||
|
|
|
|||
|
|
def list_results(self) -> List[ProcessedContent]:
|
|||
|
|
"""列出所有结果"""
|
|||
|
|
return list(self.processed_results.values())
|
|||
|
|
|
|||
|
|
def export_result(self, task_id: str, output_path: str, format_type: str = 'summary') -> bool:
|
|||
|
|
"""导出结果"""
|
|||
|
|
result = self.get_processed_result(task_id)
|
|||
|
|
if not result:
|
|||
|
|
logger.error(f"结果不存在: {task_id}")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# 根据格式类型进行转换
|
|||
|
|
if format_type != 'raw':
|
|||
|
|
# 创建临时的整合内容对象进行转换
|
|||
|
|
temp_integrated = IntegratedContent(
|
|||
|
|
documents=[],
|
|||
|
|
document_count=result.document_content.document_count,
|
|||
|
|
total_content_length=result.statistics['total_content_length'],
|
|||
|
|
document_types=result.document_content.document_types,
|
|||
|
|
combined_content=result.combined_content,
|
|||
|
|
content_summary=result.get_summary(),
|
|||
|
|
key_topics=result.document_content.key_topics
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
transformed = self.content_transformer.transform_content(
|
|||
|
|
temp_integrated,
|
|||
|
|
format_type
|
|||
|
|
)
|
|||
|
|
content_to_write = transformed.transformed_text
|
|||
|
|
else:
|
|||
|
|
content_to_write = result.combined_content
|
|||
|
|
|
|||
|
|
# 写入文件
|
|||
|
|
with open(output_path, 'w', encoding='utf-8') as f:
|
|||
|
|
f.write(content_to_write)
|
|||
|
|
|
|||
|
|
logger.info(f"导出结果到: {output_path}")
|
|||
|
|
return True
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"导出结果失败: {str(e)}")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
def cleanup_old_tasks(self, days: int = 7) -> int:
|
|||
|
|
"""清理旧任务"""
|
|||
|
|
cutoff_time = datetime.now() - timedelta(days=days)
|
|||
|
|
removed_count = 0
|
|||
|
|
|
|||
|
|
task_ids_to_remove = []
|
|||
|
|
for task_id, task in self.processing_tasks.items():
|
|||
|
|
if task.created_time < cutoff_time:
|
|||
|
|
task_ids_to_remove.append(task_id)
|
|||
|
|
|
|||
|
|
for task_id in task_ids_to_remove:
|
|||
|
|
del self.processing_tasks[task_id]
|
|||
|
|
if task_id in self.processed_results:
|
|||
|
|
del self.processed_results[task_id]
|
|||
|
|
removed_count += 1
|
|||
|
|
|
|||
|
|
logger.info(f"清理了 {removed_count} 个旧任务")
|
|||
|
|
return removed_count
|