初始化了文档处理模块
This commit is contained in:
parent
cc13f352f6
commit
a0f66a4a49
20
document/__init__.py
Normal file
20
document/__init__.py
Normal file
@ -0,0 +1,20 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""
|
||||
文档处理模块
|
||||
提供文档文本提取、内容整合、网络搜索和内容转换功能
|
||||
"""
|
||||
|
||||
from .text_extractor import TextExtractor, ExtractedDocument
|
||||
from .content_integrator import ContentIntegrator, IntegratedContent
|
||||
from .content_transformer import ContentTransformer, TransformedContent
|
||||
|
||||
__all__ = [
|
||||
'TextExtractor',
|
||||
'ExtractedDocument',
|
||||
'ContentIntegrator',
|
||||
'IntegratedContent',
|
||||
'ContentTransformer',
|
||||
'TransformedContent'
|
||||
]
|
||||
BIN
document/__pycache__/__init__.cpython-312.pyc
Normal file
BIN
document/__pycache__/__init__.cpython-312.pyc
Normal file
Binary file not shown.
BIN
document/__pycache__/content_integrator.cpython-312.pyc
Normal file
BIN
document/__pycache__/content_integrator.cpython-312.pyc
Normal file
Binary file not shown.
BIN
document/__pycache__/content_transformer.cpython-312.pyc
Normal file
BIN
document/__pycache__/content_transformer.cpython-312.pyc
Normal file
Binary file not shown.
BIN
document/__pycache__/text_extractor.cpython-312.pyc
Normal file
BIN
document/__pycache__/text_extractor.cpython-312.pyc
Normal file
Binary file not shown.
BIN
document/__pycache__/web_search.cpython-312.pyc
Normal file
BIN
document/__pycache__/web_search.cpython-312.pyc
Normal file
Binary file not shown.
130
document/content_integrator.py
Normal file
130
document/content_integrator.py
Normal file
@ -0,0 +1,130 @@
|
||||
import logging
|
||||
from typing import List, Dict, Any, Optional
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from .text_extractor import ExtractedDocument
|
||||
import re
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@dataclass
|
||||
class IntegratedContent:
|
||||
"""整合后的内容"""
|
||||
documents: List[ExtractedDocument]
|
||||
document_count: int
|
||||
total_content_length: int
|
||||
document_types: Dict[str, int]
|
||||
combined_content: str
|
||||
content_summary: str
|
||||
key_topics: List[str]
|
||||
|
||||
def __post_init__(self):
|
||||
"""初始化后处理"""
|
||||
if not self.document_types:
|
||||
self.document_types = {}
|
||||
for doc in self.documents:
|
||||
ext = doc.file_type.lower()
|
||||
self.document_types[ext] = self.document_types.get(ext, 0) + 1
|
||||
|
||||
class ContentIntegrator:
|
||||
"""内容整合器 - 整合多个文档的信息"""
|
||||
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def integrate_documents(self, documents: List[ExtractedDocument]) -> IntegratedContent:
|
||||
"""整合多个文档
|
||||
|
||||
Args:
|
||||
documents: 提取的文档列表
|
||||
|
||||
Returns:
|
||||
IntegratedContent: 整合后的内容
|
||||
"""
|
||||
if not documents:
|
||||
return IntegratedContent(
|
||||
documents=[],
|
||||
document_count=0,
|
||||
total_content_length=0,
|
||||
document_types={},
|
||||
combined_content="",
|
||||
content_summary="没有提供文档内容",
|
||||
key_topics=[]
|
||||
)
|
||||
|
||||
# 统计文档类型
|
||||
document_types = {}
|
||||
for doc in documents:
|
||||
ext = doc.file_type.lower()
|
||||
document_types[ext] = document_types.get(ext, 0) + 1
|
||||
|
||||
# 合并内容
|
||||
combined_content = self._combine_content(documents)
|
||||
total_length = len(combined_content)
|
||||
|
||||
# 生成摘要
|
||||
content_summary = self._generate_summary(documents)
|
||||
|
||||
# 提取关键主题
|
||||
key_topics = self._extract_key_topics(combined_content)
|
||||
|
||||
return IntegratedContent(
|
||||
documents=documents,
|
||||
document_count=len(documents),
|
||||
total_content_length=total_length,
|
||||
document_types=document_types,
|
||||
combined_content=combined_content,
|
||||
content_summary=content_summary,
|
||||
key_topics=key_topics
|
||||
)
|
||||
|
||||
def _combine_content(self, documents: List[ExtractedDocument]) -> str:
|
||||
"""合并文档内容"""
|
||||
combined = []
|
||||
|
||||
for i, doc in enumerate(documents, 1):
|
||||
combined.append(f"=== 文档 {i}: {doc.filename} ===")
|
||||
combined.append(f"文件类型: {doc.file_type}")
|
||||
combined.append(f"文件大小: {doc.file_size} 字节")
|
||||
combined.append(f"提取时间: {doc.extracted_at}")
|
||||
combined.append("")
|
||||
combined.append("内容:")
|
||||
combined.append(doc.content)
|
||||
combined.append("")
|
||||
combined.append("=" * 50)
|
||||
combined.append("")
|
||||
|
||||
return "\n".join(combined)
|
||||
|
||||
def _generate_summary(self, documents: List[ExtractedDocument]) -> str:
|
||||
"""生成内容摘要"""
|
||||
if not documents:
|
||||
return "没有文档内容"
|
||||
|
||||
summary_parts = []
|
||||
summary_parts.append(f"共处理了 {len(documents)} 个文档:")
|
||||
|
||||
for i, doc in enumerate(documents, 1):
|
||||
content_preview = doc.content[:100] + "..." if len(doc.content) > 100 else doc.content
|
||||
summary_parts.append(f"{i}. {doc.filename} ({doc.file_type}): {content_preview}")
|
||||
|
||||
return "\n".join(summary_parts)
|
||||
|
||||
def _extract_key_topics(self, content: str) -> List[str]:
|
||||
"""提取关键主题(简单的关键词提取)"""
|
||||
if not content:
|
||||
return []
|
||||
|
||||
# 简单的中文关键词提取
|
||||
# 这里可以根据需要使用更复杂的NLP方法
|
||||
words = re.findall(r'[\u4e00-\u9fff]+', content)
|
||||
|
||||
# 统计词频
|
||||
word_count = {}
|
||||
for word in words:
|
||||
if len(word) >= 2: # 只考虑长度>=2的词
|
||||
word_count[word] = word_count.get(word, 0) + 1
|
||||
|
||||
# 返回出现频率最高的前10个词
|
||||
sorted_words = sorted(word_count.items(), key=lambda x: x[1], reverse=True)
|
||||
return [word for word, count in sorted_words[:10] if count > 1]
|
||||
236
document/content_transformer.py
Normal file
236
document/content_transformer.py
Normal file
@ -0,0 +1,236 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""
|
||||
内容转换器模块
|
||||
使用LLM将解析的文档内容转换为标准化的景区和产品资料格式
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Dict, Any, Optional, List
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
import uuid
|
||||
|
||||
from .content_integrator import IntegratedContent
|
||||
from core.ai.ai_agent import AIAgent
|
||||
from core.config.manager import ConfigManager
|
||||
from utils.file_io import OutputManager
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@dataclass
|
||||
class TransformedContent:
|
||||
"""转换后的内容"""
|
||||
original_content: IntegratedContent
|
||||
transformed_text: str
|
||||
format_type: str
|
||||
transformation_metadata: Dict[str, Any]
|
||||
transformed_at: datetime
|
||||
|
||||
class ContentTransformer:
|
||||
"""内容转换器 - 将整合的内容转换为指定格式"""
|
||||
|
||||
def __init__(self, config: Optional[Dict[str, Any]] = None):
|
||||
self.config = config or {}
|
||||
self.supported_formats = {
|
||||
'attraction_standard': self._transform_to_attraction_standard,
|
||||
'product_sales': self._transform_to_product_sales,
|
||||
'travel_guide': self._transform_to_travel_guide,
|
||||
'blog_post': self._transform_to_blog_post,
|
||||
'summary': self._transform_to_summary
|
||||
}
|
||||
|
||||
def transform_content(self,
|
||||
integrated_content: IntegratedContent,
|
||||
format_type: str = 'summary',
|
||||
custom_prompt: Optional[str] = None) -> TransformedContent:
|
||||
"""转换内容
|
||||
|
||||
Args:
|
||||
integrated_content: 整合后的内容
|
||||
format_type: 转换格式类型
|
||||
custom_prompt: 自定义提示词
|
||||
|
||||
Returns:
|
||||
TransformedContent: 转换后的内容
|
||||
"""
|
||||
if format_type not in self.supported_formats:
|
||||
raise ValueError(f"不支持的格式类型: {format_type}")
|
||||
|
||||
logger.info(f"开始转换内容,格式: {format_type}")
|
||||
|
||||
# 执行转换
|
||||
transform_func = self.supported_formats[format_type]
|
||||
transformed_text = transform_func(integrated_content, custom_prompt)
|
||||
|
||||
# 生成转换元数据
|
||||
transformation_metadata = {
|
||||
'format_type': format_type,
|
||||
'source_document_count': integrated_content.document_count,
|
||||
'source_content_length': integrated_content.total_content_length,
|
||||
'transformed_content_length': len(transformed_text),
|
||||
'key_topics_used': integrated_content.key_topics,
|
||||
'custom_prompt_used': custom_prompt is not None
|
||||
}
|
||||
|
||||
return TransformedContent(
|
||||
original_content=integrated_content,
|
||||
transformed_text=transformed_text,
|
||||
format_type=format_type,
|
||||
transformation_metadata=transformation_metadata,
|
||||
transformed_at=datetime.now()
|
||||
)
|
||||
|
||||
def _transform_to_attraction_standard(self, content: IntegratedContent, custom_prompt: Optional[str] = None) -> str:
|
||||
"""转换为景点标准格式"""
|
||||
template = """
|
||||
# 景点信息整理
|
||||
|
||||
## 基本信息
|
||||
- 文档来源: {document_count}个文档
|
||||
- 主要主题: {key_topics}
|
||||
|
||||
## 详细内容
|
||||
{combined_content}
|
||||
|
||||
## 内容摘要
|
||||
{content_summary}
|
||||
|
||||
---
|
||||
*基于提供的文档整理,如需更多信息请参考原始文档*
|
||||
"""
|
||||
|
||||
return template.format(
|
||||
document_count=content.document_count,
|
||||
key_topics=", ".join(content.key_topics[:5]),
|
||||
combined_content=content.combined_content,
|
||||
content_summary=content.content_summary
|
||||
)
|
||||
|
||||
def _transform_to_product_sales(self, content: IntegratedContent, custom_prompt: Optional[str] = None) -> str:
|
||||
"""转换为产品销售格式"""
|
||||
template = """
|
||||
# 产品销售资料
|
||||
|
||||
## 产品特色
|
||||
基于{document_count}个文档的信息整理:
|
||||
|
||||
{content_summary}
|
||||
|
||||
## 详细介绍
|
||||
{combined_content}
|
||||
|
||||
## 关键卖点
|
||||
{key_topics}
|
||||
|
||||
---
|
||||
*内容整理自提供的文档资料*
|
||||
"""
|
||||
|
||||
key_points = "\n".join([f"• {topic}" for topic in content.key_topics[:8]])
|
||||
|
||||
return template.format(
|
||||
document_count=content.document_count,
|
||||
content_summary=content.content_summary,
|
||||
combined_content=content.combined_content,
|
||||
key_topics=key_points
|
||||
)
|
||||
|
||||
def _transform_to_travel_guide(self, content: IntegratedContent, custom_prompt: Optional[str] = None) -> str:
|
||||
"""转换为旅游指南格式"""
|
||||
template = """
|
||||
# 旅游指南
|
||||
|
||||
## 概述
|
||||
{content_summary}
|
||||
|
||||
## 详细信息
|
||||
{combined_content}
|
||||
|
||||
## 重要提示
|
||||
- 信息来源: {document_count}个文档
|
||||
- 关键主题: {key_topics}
|
||||
|
||||
---
|
||||
*本指南基于提供的文档整理,出行前请核实最新信息*
|
||||
"""
|
||||
|
||||
return template.format(
|
||||
content_summary=content.content_summary,
|
||||
combined_content=content.combined_content,
|
||||
document_count=content.document_count,
|
||||
key_topics=", ".join(content.key_topics[:5])
|
||||
)
|
||||
|
||||
def _transform_to_blog_post(self, content: IntegratedContent, custom_prompt: Optional[str] = None) -> str:
|
||||
"""转换为博客文章格式"""
|
||||
template = """
|
||||
# 博客文章
|
||||
|
||||
## 前言
|
||||
本文基于{document_count}个文档资料整理而成。
|
||||
|
||||
## 主要内容
|
||||
|
||||
{combined_content}
|
||||
|
||||
## 总结
|
||||
{content_summary}
|
||||
|
||||
## 相关主题
|
||||
{key_topics}
|
||||
|
||||
---
|
||||
*本文内容整理自多个文档资料*
|
||||
"""
|
||||
|
||||
topics_list = "\n".join([f"- {topic}" for topic in content.key_topics[:10]])
|
||||
|
||||
return template.format(
|
||||
document_count=content.document_count,
|
||||
combined_content=content.combined_content,
|
||||
content_summary=content.content_summary,
|
||||
key_topics=topics_list
|
||||
)
|
||||
|
||||
def _transform_to_summary(self, content: IntegratedContent, custom_prompt: Optional[str] = None) -> str:
|
||||
"""转换为摘要格式"""
|
||||
template = """
|
||||
# 文档内容摘要
|
||||
|
||||
## 文档统计
|
||||
- 文档数量: {document_count}
|
||||
- 文档类型: {document_types}
|
||||
- 内容长度: {content_length}字符
|
||||
|
||||
## 内容摘要
|
||||
{content_summary}
|
||||
|
||||
## 关键主题
|
||||
{key_topics}
|
||||
|
||||
## 完整内容
|
||||
{combined_content}
|
||||
"""
|
||||
|
||||
doc_types = ", ".join([f"{k}({v}个)" for k, v in content.document_types.items()])
|
||||
topics_list = "\n".join([f"• {topic}" for topic in content.key_topics])
|
||||
|
||||
return template.format(
|
||||
document_count=content.document_count,
|
||||
document_types=doc_types,
|
||||
content_length=content.total_content_length,
|
||||
content_summary=content.content_summary,
|
||||
key_topics=topics_list,
|
||||
combined_content=content.combined_content
|
||||
)
|
||||
|
||||
def get_supported_formats(self) -> List[str]:
|
||||
"""获取支持的格式列表"""
|
||||
return list(self.supported_formats.keys())
|
||||
|
||||
def add_custom_format(self, format_name: str, transform_func):
|
||||
"""添加自定义格式"""
|
||||
self.supported_formats[format_name] = transform_func
|
||||
logger.info(f"添加自定义格式: {format_name}")
|
||||
356
document/text_extractor.py
Normal file
356
document/text_extractor.py
Normal file
@ -0,0 +1,356 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""
|
||||
文本提取器模块
|
||||
支持从PDF、Word、TXT等格式的文档中提取文本内容
|
||||
"""
|
||||
|
||||
import os
|
||||
import logging
|
||||
from typing import List, Dict, Any, Optional
|
||||
from pathlib import Path
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
|
||||
# 导入依赖库
|
||||
try:
|
||||
import PyPDF2
|
||||
import pdfplumber
|
||||
PDF_AVAILABLE = True
|
||||
except ImportError:
|
||||
PDF_AVAILABLE = False
|
||||
|
||||
try:
|
||||
from docx import Document
|
||||
DOCX_AVAILABLE = True
|
||||
except ImportError:
|
||||
DOCX_AVAILABLE = False
|
||||
|
||||
try:
|
||||
import openpyxl
|
||||
from openpyxl import load_workbook
|
||||
EXCEL_AVAILABLE = True
|
||||
except ImportError:
|
||||
EXCEL_AVAILABLE = False
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@dataclass
|
||||
class ExtractedDocument:
|
||||
"""提取的文档数据"""
|
||||
filename: str
|
||||
file_type: str
|
||||
content: str # 纯文本内容
|
||||
metadata: Dict[str, Any] # 文档元数据
|
||||
extracted_at: datetime
|
||||
file_size: int
|
||||
page_count: Optional[int] = None
|
||||
|
||||
def __post_init__(self):
|
||||
# 确保content是字符串
|
||||
if not isinstance(self.content, str):
|
||||
self.content = str(self.content)
|
||||
|
||||
class TextExtractor:
|
||||
"""文本提取器 - 只做纯文本提取,保留所有原始内容"""
|
||||
|
||||
def __init__(self, config: Optional[Dict[str, Any]] = None):
|
||||
self.config = config or {}
|
||||
self.supported_formats = {
|
||||
'.pdf': self._extract_pdf,
|
||||
'.docx': self._extract_docx,
|
||||
'.doc': self._extract_doc,
|
||||
'.txt': self._extract_txt,
|
||||
'.md': self._extract_txt,
|
||||
'.xlsx': self._extract_xlsx,
|
||||
'.xls': self._extract_xls,
|
||||
'.csv': self._extract_csv
|
||||
}
|
||||
|
||||
def extract(self, file_path: str) -> ExtractedDocument:
|
||||
"""提取单个文件的文本内容"""
|
||||
path_obj = Path(file_path)
|
||||
|
||||
if not path_obj.exists():
|
||||
raise FileNotFoundError(f"文件不存在: {file_path}")
|
||||
|
||||
file_ext = path_obj.suffix.lower()
|
||||
if file_ext not in self.supported_formats:
|
||||
raise ValueError(f"不支持的文件格式: {file_ext}")
|
||||
|
||||
try:
|
||||
# 获取文件信息
|
||||
file_size = path_obj.stat().st_size
|
||||
|
||||
# 提取文本内容
|
||||
extractor = self.supported_formats[file_ext]
|
||||
content, metadata = extractor(path_obj)
|
||||
|
||||
return ExtractedDocument(
|
||||
filename=path_obj.name,
|
||||
file_type=file_ext,
|
||||
content=content,
|
||||
metadata=metadata,
|
||||
extracted_at=datetime.now(),
|
||||
file_size=file_size,
|
||||
page_count=metadata.get('page_count')
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"提取文件 {file_path} 时出错: {str(e)}")
|
||||
raise
|
||||
|
||||
def extract_batch(self, file_paths: List[str]) -> List[ExtractedDocument]:
|
||||
"""批量提取多个文件的文本内容"""
|
||||
results = []
|
||||
|
||||
for file_path in file_paths:
|
||||
try:
|
||||
result = self.extract(file_path)
|
||||
results.append(result)
|
||||
logger.info(f"成功提取文件: {file_path}")
|
||||
except Exception as e:
|
||||
logger.error(f"提取文件 {file_path} 失败: {str(e)}")
|
||||
# 创建错误记录
|
||||
error_doc = ExtractedDocument(
|
||||
filename=Path(file_path).name,
|
||||
file_type=Path(file_path).suffix.lower(),
|
||||
content=f"提取失败: {str(e)}",
|
||||
metadata={"error": str(e)},
|
||||
extracted_at=datetime.now(),
|
||||
file_size=0
|
||||
)
|
||||
results.append(error_doc)
|
||||
|
||||
return results
|
||||
|
||||
def _extract_pdf(self, file_path: Path) -> tuple[str, Dict[str, Any]]:
|
||||
"""提取PDF文件的纯文本内容"""
|
||||
if not PDF_AVAILABLE:
|
||||
raise ImportError("需要安装 PyPDF2 和 pdfplumber: pip install PyPDF2 pdfplumber")
|
||||
|
||||
content_parts = []
|
||||
metadata = {}
|
||||
|
||||
try:
|
||||
# 使用pdfplumber提取文本(更好的文本提取)
|
||||
with pdfplumber.open(file_path) as pdf:
|
||||
metadata['page_count'] = len(pdf.pages)
|
||||
|
||||
for page_num, page in enumerate(pdf.pages, 1):
|
||||
page_text = page.extract_text()
|
||||
if page_text:
|
||||
content_parts.append(f"=== 第 {page_num} 页 ===\n{page_text}\n")
|
||||
|
||||
# 获取文档元数据
|
||||
if pdf.metadata:
|
||||
metadata.update({
|
||||
'title': pdf.metadata.get('Title', ''),
|
||||
'author': pdf.metadata.get('Author', ''),
|
||||
'subject': pdf.metadata.get('Subject', ''),
|
||||
'creator': pdf.metadata.get('Creator', ''),
|
||||
'producer': pdf.metadata.get('Producer', ''),
|
||||
'creation_date': pdf.metadata.get('CreationDate', ''),
|
||||
'modification_date': pdf.metadata.get('ModDate', '')
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"pdfplumber提取失败,尝试使用PyPDF2: {str(e)}")
|
||||
|
||||
# 备用方案:使用PyPDF2
|
||||
with open(file_path, 'rb') as file:
|
||||
pdf_reader = PyPDF2.PdfReader(file)
|
||||
metadata['page_count'] = len(pdf_reader.pages)
|
||||
|
||||
for page_num, page in enumerate(pdf_reader.pages, 1):
|
||||
page_text = page.extract_text()
|
||||
if page_text:
|
||||
content_parts.append(f"=== 第 {page_num} 页 ===\n{page_text}\n")
|
||||
|
||||
# 获取文档元数据
|
||||
if pdf_reader.metadata:
|
||||
metadata.update({
|
||||
'title': pdf_reader.metadata.get('/Title', ''),
|
||||
'author': pdf_reader.metadata.get('/Author', ''),
|
||||
'subject': pdf_reader.metadata.get('/Subject', ''),
|
||||
'creator': pdf_reader.metadata.get('/Creator', ''),
|
||||
'producer': pdf_reader.metadata.get('/Producer', ''),
|
||||
'creation_date': pdf_reader.metadata.get('/CreationDate', ''),
|
||||
'modification_date': pdf_reader.metadata.get('/ModDate', '')
|
||||
})
|
||||
|
||||
content = '\n'.join(content_parts) if content_parts else ""
|
||||
return content, metadata
|
||||
|
||||
def _extract_docx(self, file_path: Path) -> tuple[str, Dict[str, Any]]:
|
||||
"""提取DOCX文件的纯文本内容"""
|
||||
if not DOCX_AVAILABLE:
|
||||
raise ImportError("需要安装 python-docx: pip install python-docx")
|
||||
|
||||
doc = Document(str(file_path))
|
||||
content_parts = []
|
||||
metadata = {}
|
||||
|
||||
# 提取所有段落文本
|
||||
for paragraph in doc.paragraphs:
|
||||
if paragraph.text.strip():
|
||||
content_parts.append(paragraph.text)
|
||||
|
||||
# 提取表格内容
|
||||
for table in doc.tables:
|
||||
table_content = []
|
||||
for row in table.rows:
|
||||
row_content = []
|
||||
for cell in row.cells:
|
||||
row_content.append(cell.text.strip())
|
||||
table_content.append('\t'.join(row_content))
|
||||
if table_content:
|
||||
content_parts.append('\n=== 表格 ===\n' + '\n'.join(table_content) + '\n')
|
||||
|
||||
# 获取文档属性
|
||||
core_props = doc.core_properties
|
||||
metadata.update({
|
||||
'title': core_props.title or '',
|
||||
'author': core_props.author or '',
|
||||
'subject': core_props.subject or '',
|
||||
'keywords': core_props.keywords or '',
|
||||
'comments': core_props.comments or '',
|
||||
'created': str(core_props.created) if core_props.created else '',
|
||||
'modified': str(core_props.modified) if core_props.modified else '',
|
||||
'last_modified_by': core_props.last_modified_by or '',
|
||||
'paragraph_count': len(doc.paragraphs),
|
||||
'table_count': len(doc.tables)
|
||||
})
|
||||
|
||||
content = '\n'.join(content_parts)
|
||||
return content, metadata
|
||||
|
||||
def _extract_doc(self, file_path: Path) -> tuple[str, Dict[str, Any]]:
|
||||
"""提取DOC文件的纯文本内容"""
|
||||
# DOC格式较复杂,建议转换为DOCX或使用专门的库
|
||||
logger.warning("DOC格式支持有限,建议转换为DOCX格式")
|
||||
|
||||
# 尝试读取为文本文件
|
||||
try:
|
||||
with open(file_path, 'r', encoding='utf-8', errors='ignore') as file:
|
||||
content = file.read()
|
||||
except:
|
||||
with open(file_path, 'r', encoding='gbk', errors='ignore') as file:
|
||||
content = file.read()
|
||||
|
||||
metadata = {'format': 'doc', 'encoding_note': '可能存在编码问题'}
|
||||
return content, metadata
|
||||
|
||||
def _extract_txt(self, file_path: Path) -> tuple[str, Dict[str, Any]]:
|
||||
"""提取TXT/MD文件的纯文本内容"""
|
||||
encodings = ['utf-8', 'gbk', 'gb2312', 'big5', 'utf-16']
|
||||
content = ""
|
||||
used_encoding = ""
|
||||
|
||||
for encoding in encodings:
|
||||
try:
|
||||
with open(file_path, 'r', encoding=encoding) as file:
|
||||
content = file.read()
|
||||
used_encoding = encoding
|
||||
break
|
||||
except UnicodeDecodeError:
|
||||
continue
|
||||
|
||||
if not content:
|
||||
# 最后尝试忽略错误
|
||||
with open(file_path, 'r', encoding='utf-8', errors='ignore') as file:
|
||||
content = file.read()
|
||||
used_encoding = 'utf-8 (with errors ignored)'
|
||||
|
||||
metadata = {
|
||||
'encoding': used_encoding,
|
||||
'line_count': len(content.splitlines()),
|
||||
'char_count': len(content)
|
||||
}
|
||||
|
||||
return content, metadata
|
||||
|
||||
def _extract_xlsx(self, file_path: Path) -> tuple[str, Dict[str, Any]]:
|
||||
"""提取XLSX文件的纯文本内容"""
|
||||
if not EXCEL_AVAILABLE:
|
||||
raise ImportError("需要安装 openpyxl: pip install openpyxl")
|
||||
|
||||
workbook = load_workbook(file_path, read_only=True)
|
||||
content_parts = []
|
||||
metadata = {
|
||||
'sheet_count': len(workbook.sheetnames),
|
||||
'sheet_names': workbook.sheetnames
|
||||
}
|
||||
|
||||
for sheet_name in workbook.sheetnames:
|
||||
sheet = workbook[sheet_name]
|
||||
content_parts.append(f"\n=== 工作表: {sheet_name} ===\n")
|
||||
|
||||
for row in sheet.iter_rows(values_only=True):
|
||||
row_content = []
|
||||
for cell in row:
|
||||
if cell is not None:
|
||||
row_content.append(str(cell))
|
||||
else:
|
||||
row_content.append("")
|
||||
if any(cell.strip() for cell in row_content): # 跳过空行
|
||||
content_parts.append('\t'.join(row_content))
|
||||
|
||||
content = '\n'.join(content_parts)
|
||||
return content, metadata
|
||||
|
||||
def _extract_xls(self, file_path: Path) -> tuple[str, Dict[str, Any]]:
|
||||
"""提取XLS文件的纯文本内容"""
|
||||
logger.warning("XLS格式支持有限,建议转换为XLSX格式")
|
||||
|
||||
# 简单的文本提取
|
||||
try:
|
||||
with open(file_path, 'rb') as file:
|
||||
content = file.read().decode('utf-8', errors='ignore')
|
||||
except:
|
||||
content = f"无法读取XLS文件: {file_path}"
|
||||
|
||||
metadata = {'format': 'xls', 'note': '可能存在格式问题'}
|
||||
return content, metadata
|
||||
|
||||
def _extract_csv(self, file_path: Path) -> tuple[str, Dict[str, Any]]:
|
||||
"""提取CSV文件的纯文本内容"""
|
||||
encodings = ['utf-8', 'gbk', 'gb2312']
|
||||
content = ""
|
||||
used_encoding = ""
|
||||
|
||||
for encoding in encodings:
|
||||
try:
|
||||
with open(file_path, 'r', encoding=encoding) as file:
|
||||
content = file.read()
|
||||
used_encoding = encoding
|
||||
break
|
||||
except UnicodeDecodeError:
|
||||
continue
|
||||
|
||||
if not content:
|
||||
with open(file_path, 'r', encoding='utf-8', errors='ignore') as file:
|
||||
content = file.read()
|
||||
used_encoding = 'utf-8 (with errors ignored)'
|
||||
|
||||
# 计算行数和列数
|
||||
lines = content.splitlines()
|
||||
row_count = len(lines)
|
||||
col_count = len(lines[0].split(',')) if lines else 0
|
||||
|
||||
metadata = {
|
||||
'encoding': used_encoding,
|
||||
'row_count': row_count,
|
||||
'estimated_col_count': col_count
|
||||
}
|
||||
|
||||
return content, metadata
|
||||
|
||||
def get_supported_formats(self) -> List[str]:
|
||||
"""获取支持的文件格式列表"""
|
||||
return list(self.supported_formats.keys())
|
||||
|
||||
def is_supported(self, file_path: str) -> bool:
|
||||
"""检查文件格式是否支持"""
|
||||
return Path(file_path).suffix.lower() in self.supported_formats
|
||||
Loading…
x
Reference in New Issue
Block a user