diff --git a/document/__init__.py b/document/__init__.py new file mode 100644 index 0000000..47dbe92 --- /dev/null +++ b/document/__init__.py @@ -0,0 +1,20 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +文档处理模块 +提供文档文本提取、内容整合、网络搜索和内容转换功能 +""" + +from .text_extractor import TextExtractor, ExtractedDocument +from .content_integrator import ContentIntegrator, IntegratedContent +from .content_transformer import ContentTransformer, TransformedContent + +__all__ = [ + 'TextExtractor', + 'ExtractedDocument', + 'ContentIntegrator', + 'IntegratedContent', + 'ContentTransformer', + 'TransformedContent' +] \ No newline at end of file diff --git a/document/__pycache__/__init__.cpython-312.pyc b/document/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..c005cd5 Binary files /dev/null and b/document/__pycache__/__init__.cpython-312.pyc differ diff --git a/document/__pycache__/content_integrator.cpython-312.pyc b/document/__pycache__/content_integrator.cpython-312.pyc new file mode 100644 index 0000000..ed99577 Binary files /dev/null and b/document/__pycache__/content_integrator.cpython-312.pyc differ diff --git a/document/__pycache__/content_transformer.cpython-312.pyc b/document/__pycache__/content_transformer.cpython-312.pyc new file mode 100644 index 0000000..aa8916c Binary files /dev/null and b/document/__pycache__/content_transformer.cpython-312.pyc differ diff --git a/document/__pycache__/text_extractor.cpython-312.pyc b/document/__pycache__/text_extractor.cpython-312.pyc new file mode 100644 index 0000000..6101c64 Binary files /dev/null and b/document/__pycache__/text_extractor.cpython-312.pyc differ diff --git a/document/__pycache__/web_search.cpython-312.pyc b/document/__pycache__/web_search.cpython-312.pyc new file mode 100644 index 0000000..171a31e Binary files /dev/null and b/document/__pycache__/web_search.cpython-312.pyc differ diff --git a/document/content_integrator.py b/document/content_integrator.py new file mode 100644 index 0000000..f37d8ab --- /dev/null +++ b/document/content_integrator.py @@ -0,0 +1,130 @@ +import logging +from typing import List, Dict, Any, Optional +from dataclasses import dataclass +from datetime import datetime +from .text_extractor import ExtractedDocument +import re + +logger = logging.getLogger(__name__) + +@dataclass +class IntegratedContent: + """整合后的内容""" + documents: List[ExtractedDocument] + document_count: int + total_content_length: int + document_types: Dict[str, int] + combined_content: str + content_summary: str + key_topics: List[str] + + def __post_init__(self): + """初始化后处理""" + if not self.document_types: + self.document_types = {} + for doc in self.documents: + ext = doc.file_type.lower() + self.document_types[ext] = self.document_types.get(ext, 0) + 1 + +class ContentIntegrator: + """内容整合器 - 整合多个文档的信息""" + + def __init__(self): + pass + + def integrate_documents(self, documents: List[ExtractedDocument]) -> IntegratedContent: + """整合多个文档 + + Args: + documents: 提取的文档列表 + + Returns: + IntegratedContent: 整合后的内容 + """ + if not documents: + return IntegratedContent( + documents=[], + document_count=0, + total_content_length=0, + document_types={}, + combined_content="", + content_summary="没有提供文档内容", + key_topics=[] + ) + + # 统计文档类型 + document_types = {} + for doc in documents: + ext = doc.file_type.lower() + document_types[ext] = document_types.get(ext, 0) + 1 + + # 合并内容 + combined_content = self._combine_content(documents) + total_length = len(combined_content) + + # 生成摘要 + content_summary = self._generate_summary(documents) + + # 提取关键主题 + key_topics = self._extract_key_topics(combined_content) + + return IntegratedContent( + documents=documents, + document_count=len(documents), + total_content_length=total_length, + document_types=document_types, + combined_content=combined_content, + content_summary=content_summary, + key_topics=key_topics + ) + + def _combine_content(self, documents: List[ExtractedDocument]) -> str: + """合并文档内容""" + combined = [] + + for i, doc in enumerate(documents, 1): + combined.append(f"=== 文档 {i}: {doc.filename} ===") + combined.append(f"文件类型: {doc.file_type}") + combined.append(f"文件大小: {doc.file_size} 字节") + combined.append(f"提取时间: {doc.extracted_at}") + combined.append("") + combined.append("内容:") + combined.append(doc.content) + combined.append("") + combined.append("=" * 50) + combined.append("") + + return "\n".join(combined) + + def _generate_summary(self, documents: List[ExtractedDocument]) -> str: + """生成内容摘要""" + if not documents: + return "没有文档内容" + + summary_parts = [] + summary_parts.append(f"共处理了 {len(documents)} 个文档:") + + for i, doc in enumerate(documents, 1): + content_preview = doc.content[:100] + "..." if len(doc.content) > 100 else doc.content + summary_parts.append(f"{i}. {doc.filename} ({doc.file_type}): {content_preview}") + + return "\n".join(summary_parts) + + def _extract_key_topics(self, content: str) -> List[str]: + """提取关键主题(简单的关键词提取)""" + if not content: + return [] + + # 简单的中文关键词提取 + # 这里可以根据需要使用更复杂的NLP方法 + words = re.findall(r'[\u4e00-\u9fff]+', content) + + # 统计词频 + word_count = {} + for word in words: + if len(word) >= 2: # 只考虑长度>=2的词 + word_count[word] = word_count.get(word, 0) + 1 + + # 返回出现频率最高的前10个词 + sorted_words = sorted(word_count.items(), key=lambda x: x[1], reverse=True) + return [word for word, count in sorted_words[:10] if count > 1] \ No newline at end of file diff --git a/document/content_transformer.py b/document/content_transformer.py new file mode 100644 index 0000000..17cffa7 --- /dev/null +++ b/document/content_transformer.py @@ -0,0 +1,236 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +内容转换器模块 +使用LLM将解析的文档内容转换为标准化的景区和产品资料格式 +""" + +import logging +from typing import Dict, Any, Optional, List +from dataclasses import dataclass +from datetime import datetime +import uuid + +from .content_integrator import IntegratedContent +from core.ai.ai_agent import AIAgent +from core.config.manager import ConfigManager +from utils.file_io import OutputManager + +logger = logging.getLogger(__name__) + +@dataclass +class TransformedContent: + """转换后的内容""" + original_content: IntegratedContent + transformed_text: str + format_type: str + transformation_metadata: Dict[str, Any] + transformed_at: datetime + +class ContentTransformer: + """内容转换器 - 将整合的内容转换为指定格式""" + + def __init__(self, config: Optional[Dict[str, Any]] = None): + self.config = config or {} + self.supported_formats = { + 'attraction_standard': self._transform_to_attraction_standard, + 'product_sales': self._transform_to_product_sales, + 'travel_guide': self._transform_to_travel_guide, + 'blog_post': self._transform_to_blog_post, + 'summary': self._transform_to_summary + } + + def transform_content(self, + integrated_content: IntegratedContent, + format_type: str = 'summary', + custom_prompt: Optional[str] = None) -> TransformedContent: + """转换内容 + + Args: + integrated_content: 整合后的内容 + format_type: 转换格式类型 + custom_prompt: 自定义提示词 + + Returns: + TransformedContent: 转换后的内容 + """ + if format_type not in self.supported_formats: + raise ValueError(f"不支持的格式类型: {format_type}") + + logger.info(f"开始转换内容,格式: {format_type}") + + # 执行转换 + transform_func = self.supported_formats[format_type] + transformed_text = transform_func(integrated_content, custom_prompt) + + # 生成转换元数据 + transformation_metadata = { + 'format_type': format_type, + 'source_document_count': integrated_content.document_count, + 'source_content_length': integrated_content.total_content_length, + 'transformed_content_length': len(transformed_text), + 'key_topics_used': integrated_content.key_topics, + 'custom_prompt_used': custom_prompt is not None + } + + return TransformedContent( + original_content=integrated_content, + transformed_text=transformed_text, + format_type=format_type, + transformation_metadata=transformation_metadata, + transformed_at=datetime.now() + ) + + def _transform_to_attraction_standard(self, content: IntegratedContent, custom_prompt: Optional[str] = None) -> str: + """转换为景点标准格式""" + template = """ +# 景点信息整理 + +## 基本信息 +- 文档来源: {document_count}个文档 +- 主要主题: {key_topics} + +## 详细内容 +{combined_content} + +## 内容摘要 +{content_summary} + +--- +*基于提供的文档整理,如需更多信息请参考原始文档* +""" + + return template.format( + document_count=content.document_count, + key_topics=", ".join(content.key_topics[:5]), + combined_content=content.combined_content, + content_summary=content.content_summary + ) + + def _transform_to_product_sales(self, content: IntegratedContent, custom_prompt: Optional[str] = None) -> str: + """转换为产品销售格式""" + template = """ +# 产品销售资料 + +## 产品特色 +基于{document_count}个文档的信息整理: + +{content_summary} + +## 详细介绍 +{combined_content} + +## 关键卖点 +{key_topics} + +--- +*内容整理自提供的文档资料* +""" + + key_points = "\n".join([f"• {topic}" for topic in content.key_topics[:8]]) + + return template.format( + document_count=content.document_count, + content_summary=content.content_summary, + combined_content=content.combined_content, + key_topics=key_points + ) + + def _transform_to_travel_guide(self, content: IntegratedContent, custom_prompt: Optional[str] = None) -> str: + """转换为旅游指南格式""" + template = """ +# 旅游指南 + +## 概述 +{content_summary} + +## 详细信息 +{combined_content} + +## 重要提示 +- 信息来源: {document_count}个文档 +- 关键主题: {key_topics} + +--- +*本指南基于提供的文档整理,出行前请核实最新信息* +""" + + return template.format( + content_summary=content.content_summary, + combined_content=content.combined_content, + document_count=content.document_count, + key_topics=", ".join(content.key_topics[:5]) + ) + + def _transform_to_blog_post(self, content: IntegratedContent, custom_prompt: Optional[str] = None) -> str: + """转换为博客文章格式""" + template = """ +# 博客文章 + +## 前言 +本文基于{document_count}个文档资料整理而成。 + +## 主要内容 + +{combined_content} + +## 总结 +{content_summary} + +## 相关主题 +{key_topics} + +--- +*本文内容整理自多个文档资料* +""" + + topics_list = "\n".join([f"- {topic}" for topic in content.key_topics[:10]]) + + return template.format( + document_count=content.document_count, + combined_content=content.combined_content, + content_summary=content.content_summary, + key_topics=topics_list + ) + + def _transform_to_summary(self, content: IntegratedContent, custom_prompt: Optional[str] = None) -> str: + """转换为摘要格式""" + template = """ +# 文档内容摘要 + +## 文档统计 +- 文档数量: {document_count} +- 文档类型: {document_types} +- 内容长度: {content_length}字符 + +## 内容摘要 +{content_summary} + +## 关键主题 +{key_topics} + +## 完整内容 +{combined_content} +""" + + doc_types = ", ".join([f"{k}({v}个)" for k, v in content.document_types.items()]) + topics_list = "\n".join([f"• {topic}" for topic in content.key_topics]) + + return template.format( + document_count=content.document_count, + document_types=doc_types, + content_length=content.total_content_length, + content_summary=content.content_summary, + key_topics=topics_list, + combined_content=content.combined_content + ) + + def get_supported_formats(self) -> List[str]: + """获取支持的格式列表""" + return list(self.supported_formats.keys()) + + def add_custom_format(self, format_name: str, transform_func): + """添加自定义格式""" + self.supported_formats[format_name] = transform_func + logger.info(f"添加自定义格式: {format_name}") \ No newline at end of file diff --git a/document/text_extractor.py b/document/text_extractor.py new file mode 100644 index 0000000..5c60d0a --- /dev/null +++ b/document/text_extractor.py @@ -0,0 +1,356 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +文本提取器模块 +支持从PDF、Word、TXT等格式的文档中提取文本内容 +""" + +import os +import logging +from typing import List, Dict, Any, Optional +from pathlib import Path +from dataclasses import dataclass +from datetime import datetime + +# 导入依赖库 +try: + import PyPDF2 + import pdfplumber + PDF_AVAILABLE = True +except ImportError: + PDF_AVAILABLE = False + +try: + from docx import Document + DOCX_AVAILABLE = True +except ImportError: + DOCX_AVAILABLE = False + +try: + import openpyxl + from openpyxl import load_workbook + EXCEL_AVAILABLE = True +except ImportError: + EXCEL_AVAILABLE = False + +logger = logging.getLogger(__name__) + +@dataclass +class ExtractedDocument: + """提取的文档数据""" + filename: str + file_type: str + content: str # 纯文本内容 + metadata: Dict[str, Any] # 文档元数据 + extracted_at: datetime + file_size: int + page_count: Optional[int] = None + + def __post_init__(self): + # 确保content是字符串 + if not isinstance(self.content, str): + self.content = str(self.content) + +class TextExtractor: + """文本提取器 - 只做纯文本提取,保留所有原始内容""" + + def __init__(self, config: Optional[Dict[str, Any]] = None): + self.config = config or {} + self.supported_formats = { + '.pdf': self._extract_pdf, + '.docx': self._extract_docx, + '.doc': self._extract_doc, + '.txt': self._extract_txt, + '.md': self._extract_txt, + '.xlsx': self._extract_xlsx, + '.xls': self._extract_xls, + '.csv': self._extract_csv + } + + def extract(self, file_path: str) -> ExtractedDocument: + """提取单个文件的文本内容""" + path_obj = Path(file_path) + + if not path_obj.exists(): + raise FileNotFoundError(f"文件不存在: {file_path}") + + file_ext = path_obj.suffix.lower() + if file_ext not in self.supported_formats: + raise ValueError(f"不支持的文件格式: {file_ext}") + + try: + # 获取文件信息 + file_size = path_obj.stat().st_size + + # 提取文本内容 + extractor = self.supported_formats[file_ext] + content, metadata = extractor(path_obj) + + return ExtractedDocument( + filename=path_obj.name, + file_type=file_ext, + content=content, + metadata=metadata, + extracted_at=datetime.now(), + file_size=file_size, + page_count=metadata.get('page_count') + ) + + except Exception as e: + logger.error(f"提取文件 {file_path} 时出错: {str(e)}") + raise + + def extract_batch(self, file_paths: List[str]) -> List[ExtractedDocument]: + """批量提取多个文件的文本内容""" + results = [] + + for file_path in file_paths: + try: + result = self.extract(file_path) + results.append(result) + logger.info(f"成功提取文件: {file_path}") + except Exception as e: + logger.error(f"提取文件 {file_path} 失败: {str(e)}") + # 创建错误记录 + error_doc = ExtractedDocument( + filename=Path(file_path).name, + file_type=Path(file_path).suffix.lower(), + content=f"提取失败: {str(e)}", + metadata={"error": str(e)}, + extracted_at=datetime.now(), + file_size=0 + ) + results.append(error_doc) + + return results + + def _extract_pdf(self, file_path: Path) -> tuple[str, Dict[str, Any]]: + """提取PDF文件的纯文本内容""" + if not PDF_AVAILABLE: + raise ImportError("需要安装 PyPDF2 和 pdfplumber: pip install PyPDF2 pdfplumber") + + content_parts = [] + metadata = {} + + try: + # 使用pdfplumber提取文本(更好的文本提取) + with pdfplumber.open(file_path) as pdf: + metadata['page_count'] = len(pdf.pages) + + for page_num, page in enumerate(pdf.pages, 1): + page_text = page.extract_text() + if page_text: + content_parts.append(f"=== 第 {page_num} 页 ===\n{page_text}\n") + + # 获取文档元数据 + if pdf.metadata: + metadata.update({ + 'title': pdf.metadata.get('Title', ''), + 'author': pdf.metadata.get('Author', ''), + 'subject': pdf.metadata.get('Subject', ''), + 'creator': pdf.metadata.get('Creator', ''), + 'producer': pdf.metadata.get('Producer', ''), + 'creation_date': pdf.metadata.get('CreationDate', ''), + 'modification_date': pdf.metadata.get('ModDate', '') + }) + + except Exception as e: + logger.warning(f"pdfplumber提取失败,尝试使用PyPDF2: {str(e)}") + + # 备用方案:使用PyPDF2 + with open(file_path, 'rb') as file: + pdf_reader = PyPDF2.PdfReader(file) + metadata['page_count'] = len(pdf_reader.pages) + + for page_num, page in enumerate(pdf_reader.pages, 1): + page_text = page.extract_text() + if page_text: + content_parts.append(f"=== 第 {page_num} 页 ===\n{page_text}\n") + + # 获取文档元数据 + if pdf_reader.metadata: + metadata.update({ + 'title': pdf_reader.metadata.get('/Title', ''), + 'author': pdf_reader.metadata.get('/Author', ''), + 'subject': pdf_reader.metadata.get('/Subject', ''), + 'creator': pdf_reader.metadata.get('/Creator', ''), + 'producer': pdf_reader.metadata.get('/Producer', ''), + 'creation_date': pdf_reader.metadata.get('/CreationDate', ''), + 'modification_date': pdf_reader.metadata.get('/ModDate', '') + }) + + content = '\n'.join(content_parts) if content_parts else "" + return content, metadata + + def _extract_docx(self, file_path: Path) -> tuple[str, Dict[str, Any]]: + """提取DOCX文件的纯文本内容""" + if not DOCX_AVAILABLE: + raise ImportError("需要安装 python-docx: pip install python-docx") + + doc = Document(str(file_path)) + content_parts = [] + metadata = {} + + # 提取所有段落文本 + for paragraph in doc.paragraphs: + if paragraph.text.strip(): + content_parts.append(paragraph.text) + + # 提取表格内容 + for table in doc.tables: + table_content = [] + for row in table.rows: + row_content = [] + for cell in row.cells: + row_content.append(cell.text.strip()) + table_content.append('\t'.join(row_content)) + if table_content: + content_parts.append('\n=== 表格 ===\n' + '\n'.join(table_content) + '\n') + + # 获取文档属性 + core_props = doc.core_properties + metadata.update({ + 'title': core_props.title or '', + 'author': core_props.author or '', + 'subject': core_props.subject or '', + 'keywords': core_props.keywords or '', + 'comments': core_props.comments or '', + 'created': str(core_props.created) if core_props.created else '', + 'modified': str(core_props.modified) if core_props.modified else '', + 'last_modified_by': core_props.last_modified_by or '', + 'paragraph_count': len(doc.paragraphs), + 'table_count': len(doc.tables) + }) + + content = '\n'.join(content_parts) + return content, metadata + + def _extract_doc(self, file_path: Path) -> tuple[str, Dict[str, Any]]: + """提取DOC文件的纯文本内容""" + # DOC格式较复杂,建议转换为DOCX或使用专门的库 + logger.warning("DOC格式支持有限,建议转换为DOCX格式") + + # 尝试读取为文本文件 + try: + with open(file_path, 'r', encoding='utf-8', errors='ignore') as file: + content = file.read() + except: + with open(file_path, 'r', encoding='gbk', errors='ignore') as file: + content = file.read() + + metadata = {'format': 'doc', 'encoding_note': '可能存在编码问题'} + return content, metadata + + def _extract_txt(self, file_path: Path) -> tuple[str, Dict[str, Any]]: + """提取TXT/MD文件的纯文本内容""" + encodings = ['utf-8', 'gbk', 'gb2312', 'big5', 'utf-16'] + content = "" + used_encoding = "" + + for encoding in encodings: + try: + with open(file_path, 'r', encoding=encoding) as file: + content = file.read() + used_encoding = encoding + break + except UnicodeDecodeError: + continue + + if not content: + # 最后尝试忽略错误 + with open(file_path, 'r', encoding='utf-8', errors='ignore') as file: + content = file.read() + used_encoding = 'utf-8 (with errors ignored)' + + metadata = { + 'encoding': used_encoding, + 'line_count': len(content.splitlines()), + 'char_count': len(content) + } + + return content, metadata + + def _extract_xlsx(self, file_path: Path) -> tuple[str, Dict[str, Any]]: + """提取XLSX文件的纯文本内容""" + if not EXCEL_AVAILABLE: + raise ImportError("需要安装 openpyxl: pip install openpyxl") + + workbook = load_workbook(file_path, read_only=True) + content_parts = [] + metadata = { + 'sheet_count': len(workbook.sheetnames), + 'sheet_names': workbook.sheetnames + } + + for sheet_name in workbook.sheetnames: + sheet = workbook[sheet_name] + content_parts.append(f"\n=== 工作表: {sheet_name} ===\n") + + for row in sheet.iter_rows(values_only=True): + row_content = [] + for cell in row: + if cell is not None: + row_content.append(str(cell)) + else: + row_content.append("") + if any(cell.strip() for cell in row_content): # 跳过空行 + content_parts.append('\t'.join(row_content)) + + content = '\n'.join(content_parts) + return content, metadata + + def _extract_xls(self, file_path: Path) -> tuple[str, Dict[str, Any]]: + """提取XLS文件的纯文本内容""" + logger.warning("XLS格式支持有限,建议转换为XLSX格式") + + # 简单的文本提取 + try: + with open(file_path, 'rb') as file: + content = file.read().decode('utf-8', errors='ignore') + except: + content = f"无法读取XLS文件: {file_path}" + + metadata = {'format': 'xls', 'note': '可能存在格式问题'} + return content, metadata + + def _extract_csv(self, file_path: Path) -> tuple[str, Dict[str, Any]]: + """提取CSV文件的纯文本内容""" + encodings = ['utf-8', 'gbk', 'gb2312'] + content = "" + used_encoding = "" + + for encoding in encodings: + try: + with open(file_path, 'r', encoding=encoding) as file: + content = file.read() + used_encoding = encoding + break + except UnicodeDecodeError: + continue + + if not content: + with open(file_path, 'r', encoding='utf-8', errors='ignore') as file: + content = file.read() + used_encoding = 'utf-8 (with errors ignored)' + + # 计算行数和列数 + lines = content.splitlines() + row_count = len(lines) + col_count = len(lines[0].split(',')) if lines else 0 + + metadata = { + 'encoding': used_encoding, + 'row_count': row_count, + 'estimated_col_count': col_count + } + + return content, metadata + + def get_supported_formats(self) -> List[str]: + """获取支持的文件格式列表""" + return list(self.supported_formats.keys()) + + def is_supported(self, file_path: str) -> bool: + """检查文件格式是否支持""" + return Path(file_path).suffix.lower() in self.supported_formats \ No newline at end of file