#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 内容整合服务 将文档资料和小红书笔记进行整合,由LLM生成综合性旅游资料 """ import os import time import logging from typing import List, Optional, Dict, Any from pathlib import Path from datetime import datetime from core.xhs_adapter import XHSAdapter from core.models import SearchConfig from core.document_adapter import DocumentAdapter from core.ai.ai_agent import AIAgent from core.config import ConfigManager, AIModelConfig from utils.prompts import PromptTemplate logger = logging.getLogger(__name__) class ContentIntegrationService: """内容整合服务类""" def __init__(self): """初始化服务""" self.config_manager = ConfigManager() # 加载必要的配置 self.config_manager.load_from_directory("config", server_mode=True) # 初始化AI代理 ai_config = self.config_manager.get_config('ai_model', AIModelConfig) self.ai_agent = AIAgent(ai_config) # 初始化适配器 self.document_adapter = DocumentAdapter() # 加载提示词模板 self.prompt_template = PromptTemplate( system_prompt_path="resource/prompt/integration/system.txt", user_prompt_path="resource/prompt/integration/user.txt" ) async def integrate_content( self, document_paths: List[str], keywords: List[str], cookies: str, output_path: str = "data/output", sort_type: int = 2, # 0 综合排序, 1 最新, 2 最多点赞, 3 最多评论, 4 最多收藏 note_type: int = 2, # 0 不限, 1 视频笔记, 2 普通笔记 note_time: int = 0, # 0 不限, 1 一天内, 2 一周内天, 3 半年内 note_range: int = 0, # 0 不限, 1 已看过, 2 未看过, 3 已关注 pos_distance: int = 0, # 0 不限, 1 同城, 2 附近 query_num: int = 10 ) -> Dict[str, Any]: """ 整合文档和小红书内容 Args: document_paths: 文档文件路径列表 keywords: 搜索关键词列表 cookies: 小红书Cookie字符串 output_path: 输出路径 sort_type: 排序方式 note_type: 笔记类型 note_time: 笔记时间 note_range: 笔记范围 pos_distance: 位置距离 query_num: 每个关键词搜索的笔记数量 Returns: 整合结果字典 """ start_time = time.time() logger.info(f"开始整合任务:文档数量 {len(document_paths)}, 关键词数量 {len(keywords)}") try: # 确保输出目录存在 os.makedirs(output_path, exist_ok=True) # 1. 处理文档内容 logger.info("正在处理文档内容...") document_result = self.document_adapter.integrate_documents(document_paths) logger.info(f"文档处理完成,共处理 {len(document_result.documents)} 个文档") # 2. 搜索小红书笔记 logger.info("正在搜索小红书笔记...") xhs_adapter = XHSAdapter(cookies) all_notes = [] for keyword in keywords: search_config = SearchConfig( keyword=keyword, max_notes=query_num, sort_type=sort_type, note_type=note_type ) search_result = xhs_adapter.search_notes(search_config) if search_result.success: all_notes.extend(search_result.notes) logger.info(f"关键词 '{keyword}' 搜索到 {len(search_result.notes)} 条笔记") else: logger.warning(f"关键词 '{keyword}' 搜索失败: {search_result.error_message}") logger.info(f"小红书搜索完成,共获得 {len(all_notes)} 条笔记") # 3. 准备LLM整合内容 logger.info("正在准备LLM整合...") # 构建文档内容字符串 document_content = self._format_document_content(document_result) # 构建小红书笔记内容字符串 xhs_content = self._format_xhs_notes(all_notes) # 构建关键词字符串 keywords_str = ", ".join(keywords) # 4. 调用LLM进行整合 logger.info("正在调用LLM进行内容整合...") system_prompt = self.prompt_template.get_system_prompt() user_prompt = self.prompt_template.build_user_prompt( keywords=keywords_str, document_content=document_content, xhs_notes_content=xhs_content ) # 调用AI代理 response_text, input_tokens, output_tokens, time_cost = await self.ai_agent.generate_text( system_prompt=system_prompt, user_prompt=user_prompt, use_stream=True, stage="content_integration" ) # 使用file_io模块的JSON处理功能 from utils.file_io import process_llm_json_text parsed_json = process_llm_json_text(response_text) # 如果解析成功,将JSON对象转换回字符串用于存储 if parsed_json: import json cleaned_response = json.dumps(parsed_json, ensure_ascii=False, indent=2) logger.info("成功解析并清理了LLM返回的JSON内容") else: # 如果解析失败,使用原始响应 cleaned_response = response_text logger.warning("JSON解析失败,使用原始响应内容") # 5. 保存结果 processing_time = time.time() - start_time timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") result = { "success": True, "timestamp": timestamp, "processing_time": f"{processing_time:.2f}秒", "input_summary": { "document_count": len(document_result.documents), "xhs_notes_count": len(all_notes), "keywords": keywords }, "document_info": { "documents": [ { "file_path": doc.file_path, "file_type": doc.file_type, "content_length": len(doc.content) } for doc in document_result.documents ], "integrated_text_length": len(document_result.integrated_text) }, "xhs_info": { "total_notes": len(all_notes), "authors": list(set(note.author for note in all_notes if note.author)), "total_interactions": sum(note.likes + note.comments + note.shares for note in all_notes) }, "integrated_content": cleaned_response, "search_config": { "sort_type": sort_type, "note_type": note_type, "note_time": note_time, "note_range": note_range, "pos_distance": pos_distance, "query_num": query_num } } # 保存详细结果到文件 output_file = os.path.join(output_path, f"content_integration_{timestamp}.json") with open(output_file, 'w', encoding='utf-8') as f: import json json.dump(result, f, ensure_ascii=False, indent=2) logger.info(f"整合完成,结果已保存到: {output_file}") logger.info(f"总处理时间: {processing_time:.2f}秒") return result except Exception as e: error_result = { "success": False, "error_message": str(e), "timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"), "processing_time": f"{time.time() - start_time:.2f}秒" } logger.error(f"内容整合失败: {e}") return error_result def _format_document_content(self, document_result) -> str: """格式化文档内容""" content_parts = [] # 添加整合文本 if document_result.integrated_text: content_parts.append("### 文档整合内容") content_parts.append(document_result.integrated_text) content_parts.append("") # 添加各个文档的详细内容 if document_result.documents: content_parts.append("### 各文档详细内容") for i, doc in enumerate(document_result.documents, 1): content_parts.append(f"#### 文档 {i}: {Path(doc.file_path).name} ({doc.file_type})") content_parts.append(doc.content[:2000] + "..." if len(doc.content) > 2000 else doc.content) content_parts.append("") return "\n".join(content_parts) def _format_xhs_notes(self, notes) -> str: """格式化小红书笔记内容""" if not notes: return "暂无相关笔记" content_parts = [] content_parts.append(f"### 小红书相关笔记 (共 {len(notes)} 条)") content_parts.append("") for i, note in enumerate(notes, 1): content_parts.append(f"#### 笔记 {i}: {note.title}") content_parts.append(f"**作者**: {note.author}") content_parts.append(f"**互动数据**: 👍 {note.likes} | 💬 {note.comments} | 📤 {note.shares}") if note.content: # 限制每条笔记内容长度 content = note.content[:500] + "..." if len(note.content) > 500 else note.content content_parts.append(f"**内容**: {content}") if note.tags: content_parts.append(f"**标签**: {', '.join(note.tags)}") content_parts.append(f"**链接**: {note.note_url}") content_parts.append("") return "\n".join(content_parts)