bangbang-aigc-server/core/content_service.py
2025-07-31 15:35:23 +08:00

438 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
内容处理器
整合小红书内容和文档内容为LLM处理做准备
"""
import os
import json
from typing import Dict, List, Optional, Any, Tuple
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from pathlib import Path
import logging
logger = logging.getLogger(__name__)
from document import TextExtractor, ContentIntegrator, ContentTransformer
from document.text_extractor import ExtractedDocument
from document.content_integrator import IntegratedContent
from document.content_transformer import TransformedContent
from .xhs_spider_service import XHSSpiderService, SearchConfig, SearchResult, NoteContent
@dataclass
class ProcessingTask:
"""处理任务"""
task_id: str
keyword: str
document_paths: List[str]
search_config: SearchConfig
processing_options: Dict[str, Any] = field(default_factory=dict)
created_time: datetime = field(default_factory=datetime.now)
status: str = "pending" # pending, processing, completed, failed
@dataclass
class ProcessedContent:
"""处理后的内容"""
task_id: str
keyword: str
xhs_content: SearchResult
document_content: IntegratedContent
combined_content: str
processed_time: datetime
statistics: Dict[str, Any]
def get_full_content(self) -> str:
"""获取完整内容"""
return self.combined_content
def get_summary(self) -> str:
"""获取摘要"""
summary_parts = []
summary_parts.append(f"关键词: {self.keyword}")
summary_parts.append(f"处理时间: {self.processed_time}")
summary_parts.append(f"小红书笔记: {self.statistics.get('xhs_notes_count', 0)}")
summary_parts.append(f"文档数量: {self.statistics.get('document_count', 0)}")
summary_parts.append(f"总内容长度: {self.statistics.get('total_content_length', 0)} 字符")
return "\n".join(summary_parts)
class ContentProcessor:
"""内容处理器"""
def __init__(self,
xhs_service: Optional[XHSSpiderService] = None,
text_extractor: Optional[TextExtractor] = None,
content_integrator: Optional[ContentIntegrator] = None,
content_transformer: Optional[ContentTransformer] = None):
# 初始化服务组件
self.xhs_service = xhs_service or XHSSpiderService()
self.text_extractor = text_extractor or TextExtractor()
self.content_integrator = content_integrator or ContentIntegrator()
self.content_transformer = content_transformer or ContentTransformer()
# 任务管理
self.processing_tasks: Dict[str, ProcessingTask] = {}
self.processed_results: Dict[str, ProcessedContent] = {}
def create_processing_task(self,
keyword: str,
document_paths: List[str],
search_config: Optional[SearchConfig] = None,
processing_options: Optional[Dict[str, Any]] = None) -> str:
"""创建处理任务"""
task_id = f"task_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{hash(keyword) % 10000}"
if search_config is None:
search_config = SearchConfig(
keyword=keyword,
max_notes=20,
download_images=True,
download_videos=True
)
task = ProcessingTask(
task_id=task_id,
keyword=keyword,
document_paths=document_paths,
search_config=search_config,
processing_options=processing_options or {}
)
self.processing_tasks[task_id] = task
logger.info(f"创建处理任务: {task_id}, 关键词: {keyword}")
return task_id
def process_task(self, task_id: str) -> Optional[ProcessedContent]:
"""处理任务"""
if task_id not in self.processing_tasks:
logger.error(f"任务不存在: {task_id}")
return None
task = self.processing_tasks[task_id]
task.status = "processing"
logger.info(f"开始处理任务: {task_id}")
try:
# 1. 搜索小红书内容
logger.info(f"搜索小红书内容: {task.keyword}")
xhs_result = self.xhs_service.search_notes(task.search_config)
if not xhs_result.success:
logger.error(f"小红书搜索失败: {xhs_result.error_message}")
task.status = "failed"
return None
# 2. 处理文档内容
logger.info(f"处理文档内容: {len(task.document_paths)} 个文档")
document_content = self._process_documents(task.document_paths)
# 3. 整合内容
logger.info("整合内容")
combined_content = self._combine_content(xhs_result, document_content, task)
# 4. 生成统计信息
statistics = self._generate_statistics(xhs_result, document_content, combined_content)
# 5. 创建处理结果
processed_result = ProcessedContent(
task_id=task_id,
keyword=task.keyword,
xhs_content=xhs_result,
document_content=document_content,
combined_content=combined_content,
processed_time=datetime.now(),
statistics=statistics
)
self.processed_results[task_id] = processed_result
task.status = "completed"
logger.info(f"任务处理完成: {task_id}")
return processed_result
except Exception as e:
logger.error(f"处理任务出错: {str(e)}")
task.status = "failed"
return None
def _process_documents(self, document_paths: List[str]) -> IntegratedContent:
"""处理文档"""
extracted_docs = []
for doc_path in document_paths:
if not os.path.exists(doc_path):
logger.warning(f"文档不存在: {doc_path}")
continue
try:
extracted_doc = self.text_extractor.extract(doc_path)
extracted_docs.append(extracted_doc)
logger.info(f"成功提取文档: {doc_path}")
except Exception as e:
logger.error(f"提取文档失败 {doc_path}: {str(e)}")
continue
# 整合文档内容
integrated_content = self.content_integrator.integrate_documents(extracted_docs)
return integrated_content
def _combine_content(self,
xhs_result: SearchResult,
document_content: IntegratedContent,
task: ProcessingTask) -> str:
"""整合小红书内容和文档内容"""
content_parts = []
# 1. 任务信息
content_parts.append("=" * 80)
content_parts.append(f"内容整合报告")
content_parts.append("=" * 80)
content_parts.append(f"关键词: {task.keyword}")
content_parts.append(f"处理时间: {datetime.now()}")
content_parts.append(f"任务ID: {task.task_id}")
content_parts.append("")
# 2. 小红书内容
content_parts.append("🔍 小红书内容")
content_parts.append("=" * 50)
content_parts.append(f"搜索结果: {xhs_result.total_notes} 篇笔记")
content_parts.append(f"成功处理: {len(xhs_result.notes)}")
content_parts.append("")
for i, note in enumerate(xhs_result.notes, 1):
content_parts.append(f"📝 笔记 {i}: {note.title}")
content_parts.append(f"作者: {note.user_name}")
content_parts.append(f"类型: {note.note_type}")
content_parts.append(f"发布时间: {note.created_time}")
content_parts.append(f"位置: {note.location}")
content_parts.append(f"互动数据: 👍{note.stats.get('liked_count', 0)} 💖{note.stats.get('collected_count', 0)} 💬{note.stats.get('comment_count', 0)}")
content_parts.append("")
content_parts.append("内容:")
content_parts.append(note.content)
content_parts.append("")
if note.tags:
content_parts.append(f"标签: {', '.join(note.tags)}")
content_parts.append("")
content_parts.append("-" * 40)
content_parts.append("")
# 3. 文档内容
content_parts.append("📄 文档内容")
content_parts.append("=" * 50)
content_parts.append(f"文档数量: {document_content.document_count}")
content_parts.append(f"文档类型: {document_content.document_types}")
content_parts.append(f"总内容长度: {document_content.total_content_length} 字符")
content_parts.append("")
content_parts.append("文档摘要:")
content_parts.append(document_content.content_summary)
content_parts.append("")
if document_content.key_topics:
content_parts.append(f"关键主题: {', '.join(document_content.key_topics)}")
content_parts.append("")
content_parts.append("完整文档内容:")
content_parts.append(document_content.combined_content)
content_parts.append("")
# 4. 整合分析
content_parts.append("🔄 整合分析")
content_parts.append("=" * 50)
content_parts.append(f"整合时间: {datetime.now()}")
content_parts.append(f"数据来源: 小红书 + 文档资料")
content_parts.append(f"内容特点: 结合了实时用户反馈和专业资料")
content_parts.append("")
# 5. 关键信息提取
xhs_keywords = self._extract_xhs_keywords(xhs_result.notes)
doc_keywords = document_content.key_topics
content_parts.append("🔑 关键信息提取")
content_parts.append("-" * 30)
content_parts.append(f"小红书热门话题: {', '.join(xhs_keywords[:10])}")
content_parts.append(f"文档关键主题: {', '.join(doc_keywords[:10])}")
content_parts.append("")
# 6. 内容建议
content_parts.append("💡 内容建议")
content_parts.append("-" * 30)
content_parts.append("基于小红书用户反馈和专业文档,建议关注以下要点:")
# 简单的建议生成逻辑
suggestions = self._generate_suggestions(xhs_result, document_content)
for suggestion in suggestions:
content_parts.append(f"{suggestion}")
content_parts.append("")
content_parts.append("=" * 80)
return "\n".join(content_parts)
def _extract_xhs_keywords(self, notes: List[NoteContent]) -> List[str]:
"""从小红书笔记中提取关键词"""
keyword_count = {}
for note in notes:
# 提取标签
for tag in note.tags:
keyword_count[tag] = keyword_count.get(tag, 0) + 1
# 简单的关键词提取(可以改进)
words = []
if note.title:
words.extend(note.title.split())
if note.content:
words.extend(note.content.split())
for word in words:
if len(word) >= 2 and word.isalpha():
keyword_count[word] = keyword_count.get(word, 0) + 1
# 返回频次最高的关键词
sorted_keywords = sorted(keyword_count.items(), key=lambda x: x[1], reverse=True)
return [keyword for keyword, count in sorted_keywords if count > 1]
def _generate_suggestions(self,
xhs_result: SearchResult,
document_content: IntegratedContent) -> List[str]:
"""生成内容建议"""
suggestions = []
# 基于小红书数据的建议
if xhs_result.notes:
high_engagement_notes = [note for note in xhs_result.notes
if note.stats.get('liked_count', 0) > 100]
if high_engagement_notes:
suggestions.append(f"高互动笔记共 {len(high_engagement_notes)} 篇,关注用户喜好")
video_notes = [note for note in xhs_result.notes if note.note_type == '视频']
if video_notes:
suggestions.append(f"视频内容占比 {len(video_notes)}/{len(xhs_result.notes)},视频形式更受欢迎")
# 基于文档内容的建议
if document_content.document_count > 0:
suggestions.append(f"结合 {document_content.document_count} 份专业文档,确保内容权威性")
if document_content.key_topics:
suggestions.append(f"重点关注专业主题: {', '.join(document_content.key_topics[:3])}")
# 通用建议
suggestions.append("融合用户真实体验和专业知识,提高内容可信度")
suggestions.append("关注用户关心的实际问题和解决方案")
return suggestions
def _generate_statistics(self,
xhs_result: SearchResult,
document_content: IntegratedContent,
combined_content: str) -> Dict[str, Any]:
"""生成统计信息"""
stats = {
'xhs_notes_count': len(xhs_result.notes),
'xhs_total_found': xhs_result.total_notes,
'document_count': document_content.document_count,
'document_types': document_content.document_types,
'total_content_length': len(combined_content),
'xhs_content_length': len(xhs_result.get_all_text_content()),
'document_content_length': document_content.total_content_length,
'processing_time': datetime.now().isoformat()
}
# 小红书统计
if xhs_result.notes:
total_likes = sum(note.stats.get('liked_count', 0) for note in xhs_result.notes)
total_comments = sum(note.stats.get('comment_count', 0) for note in xhs_result.notes)
total_collections = sum(note.stats.get('collected_count', 0) for note in xhs_result.notes)
stats.update({
'xhs_total_likes': total_likes,
'xhs_total_comments': total_comments,
'xhs_total_collections': total_collections,
'xhs_avg_likes': total_likes / len(xhs_result.notes),
'xhs_video_count': len([n for n in xhs_result.notes if n.note_type == '视频']),
'xhs_image_count': len([n for n in xhs_result.notes if n.note_type == '图集'])
})
return stats
def get_processed_result(self, task_id: str) -> Optional[ProcessedContent]:
"""获取处理结果"""
return self.processed_results.get(task_id)
def get_task_status(self, task_id: str) -> Optional[str]:
"""获取任务状态"""
task = self.processing_tasks.get(task_id)
return task.status if task else None
def list_tasks(self) -> List[ProcessingTask]:
"""列出所有任务"""
return list(self.processing_tasks.values())
def list_results(self) -> List[ProcessedContent]:
"""列出所有结果"""
return list(self.processed_results.values())
def export_result(self, task_id: str, output_path: str, format_type: str = 'summary') -> bool:
"""导出结果"""
result = self.get_processed_result(task_id)
if not result:
logger.error(f"结果不存在: {task_id}")
return False
try:
# 根据格式类型进行转换
if format_type != 'raw':
# 创建临时的整合内容对象进行转换
temp_integrated = IntegratedContent(
documents=[],
document_count=result.document_content.document_count,
total_content_length=result.statistics['total_content_length'],
document_types=result.document_content.document_types,
combined_content=result.combined_content,
content_summary=result.get_summary(),
key_topics=result.document_content.key_topics
)
transformed = self.content_transformer.transform_content(
temp_integrated,
format_type
)
content_to_write = transformed.transformed_text
else:
content_to_write = result.combined_content
# 写入文件
with open(output_path, 'w', encoding='utf-8') as f:
f.write(content_to_write)
logger.info(f"导出结果到: {output_path}")
return True
except Exception as e:
logger.error(f"导出结果失败: {str(e)}")
return False
def cleanup_old_tasks(self, days: int = 7) -> int:
"""清理旧任务"""
cutoff_time = datetime.now() - timedelta(days=days)
removed_count = 0
task_ids_to_remove = []
for task_id, task in self.processing_tasks.items():
if task.created_time < cutoff_time:
task_ids_to_remove.append(task_id)
for task_id in task_ids_to_remove:
del self.processing_tasks[task_id]
if task_id in self.processed_results:
del self.processed_results[task_id]
removed_count += 1
logger.info(f"清理了 {removed_count} 个旧任务")
return removed_count