356 lines
13 KiB
Python
356 lines
13 KiB
Python
|
|
#!/usr/bin/env python3
|
|||
|
|
# -*- coding: utf-8 -*-
|
|||
|
|
|
|||
|
|
"""
|
|||
|
|
文本提取器模块
|
|||
|
|
支持从PDF、Word、TXT等格式的文档中提取文本内容
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import os
|
|||
|
|
import logging
|
|||
|
|
from typing import List, Dict, Any, Optional
|
|||
|
|
from pathlib import Path
|
|||
|
|
from dataclasses import dataclass
|
|||
|
|
from datetime import datetime
|
|||
|
|
|
|||
|
|
# 导入依赖库
|
|||
|
|
try:
|
|||
|
|
import PyPDF2
|
|||
|
|
import pdfplumber
|
|||
|
|
PDF_AVAILABLE = True
|
|||
|
|
except ImportError:
|
|||
|
|
PDF_AVAILABLE = False
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
from docx import Document
|
|||
|
|
DOCX_AVAILABLE = True
|
|||
|
|
except ImportError:
|
|||
|
|
DOCX_AVAILABLE = False
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
import openpyxl
|
|||
|
|
from openpyxl import load_workbook
|
|||
|
|
EXCEL_AVAILABLE = True
|
|||
|
|
except ImportError:
|
|||
|
|
EXCEL_AVAILABLE = False
|
|||
|
|
|
|||
|
|
logger = logging.getLogger(__name__)
|
|||
|
|
|
|||
|
|
@dataclass
|
|||
|
|
class ExtractedDocument:
|
|||
|
|
"""提取的文档数据"""
|
|||
|
|
filename: str
|
|||
|
|
file_type: str
|
|||
|
|
content: str # 纯文本内容
|
|||
|
|
metadata: Dict[str, Any] # 文档元数据
|
|||
|
|
extracted_at: datetime
|
|||
|
|
file_size: int
|
|||
|
|
page_count: Optional[int] = None
|
|||
|
|
|
|||
|
|
def __post_init__(self):
|
|||
|
|
# 确保content是字符串
|
|||
|
|
if not isinstance(self.content, str):
|
|||
|
|
self.content = str(self.content)
|
|||
|
|
|
|||
|
|
class TextExtractor:
|
|||
|
|
"""文本提取器 - 只做纯文本提取,保留所有原始内容"""
|
|||
|
|
|
|||
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None):
|
|||
|
|
self.config = config or {}
|
|||
|
|
self.supported_formats = {
|
|||
|
|
'.pdf': self._extract_pdf,
|
|||
|
|
'.docx': self._extract_docx,
|
|||
|
|
'.doc': self._extract_doc,
|
|||
|
|
'.txt': self._extract_txt,
|
|||
|
|
'.md': self._extract_txt,
|
|||
|
|
'.xlsx': self._extract_xlsx,
|
|||
|
|
'.xls': self._extract_xls,
|
|||
|
|
'.csv': self._extract_csv
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
def extract(self, file_path: str) -> ExtractedDocument:
|
|||
|
|
"""提取单个文件的文本内容"""
|
|||
|
|
path_obj = Path(file_path)
|
|||
|
|
|
|||
|
|
if not path_obj.exists():
|
|||
|
|
raise FileNotFoundError(f"文件不存在: {file_path}")
|
|||
|
|
|
|||
|
|
file_ext = path_obj.suffix.lower()
|
|||
|
|
if file_ext not in self.supported_formats:
|
|||
|
|
raise ValueError(f"不支持的文件格式: {file_ext}")
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# 获取文件信息
|
|||
|
|
file_size = path_obj.stat().st_size
|
|||
|
|
|
|||
|
|
# 提取文本内容
|
|||
|
|
extractor = self.supported_formats[file_ext]
|
|||
|
|
content, metadata = extractor(path_obj)
|
|||
|
|
|
|||
|
|
return ExtractedDocument(
|
|||
|
|
filename=path_obj.name,
|
|||
|
|
file_type=file_ext,
|
|||
|
|
content=content,
|
|||
|
|
metadata=metadata,
|
|||
|
|
extracted_at=datetime.now(),
|
|||
|
|
file_size=file_size,
|
|||
|
|
page_count=metadata.get('page_count')
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"提取文件 {file_path} 时出错: {str(e)}")
|
|||
|
|
raise
|
|||
|
|
|
|||
|
|
def extract_batch(self, file_paths: List[str]) -> List[ExtractedDocument]:
|
|||
|
|
"""批量提取多个文件的文本内容"""
|
|||
|
|
results = []
|
|||
|
|
|
|||
|
|
for file_path in file_paths:
|
|||
|
|
try:
|
|||
|
|
result = self.extract(file_path)
|
|||
|
|
results.append(result)
|
|||
|
|
logger.info(f"成功提取文件: {file_path}")
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"提取文件 {file_path} 失败: {str(e)}")
|
|||
|
|
# 创建错误记录
|
|||
|
|
error_doc = ExtractedDocument(
|
|||
|
|
filename=Path(file_path).name,
|
|||
|
|
file_type=Path(file_path).suffix.lower(),
|
|||
|
|
content=f"提取失败: {str(e)}",
|
|||
|
|
metadata={"error": str(e)},
|
|||
|
|
extracted_at=datetime.now(),
|
|||
|
|
file_size=0
|
|||
|
|
)
|
|||
|
|
results.append(error_doc)
|
|||
|
|
|
|||
|
|
return results
|
|||
|
|
|
|||
|
|
def _extract_pdf(self, file_path: Path) -> tuple[str, Dict[str, Any]]:
|
|||
|
|
"""提取PDF文件的纯文本内容"""
|
|||
|
|
if not PDF_AVAILABLE:
|
|||
|
|
raise ImportError("需要安装 PyPDF2 和 pdfplumber: pip install PyPDF2 pdfplumber")
|
|||
|
|
|
|||
|
|
content_parts = []
|
|||
|
|
metadata = {}
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# 使用pdfplumber提取文本(更好的文本提取)
|
|||
|
|
with pdfplumber.open(file_path) as pdf:
|
|||
|
|
metadata['page_count'] = len(pdf.pages)
|
|||
|
|
|
|||
|
|
for page_num, page in enumerate(pdf.pages, 1):
|
|||
|
|
page_text = page.extract_text()
|
|||
|
|
if page_text:
|
|||
|
|
content_parts.append(f"=== 第 {page_num} 页 ===\n{page_text}\n")
|
|||
|
|
|
|||
|
|
# 获取文档元数据
|
|||
|
|
if pdf.metadata:
|
|||
|
|
metadata.update({
|
|||
|
|
'title': pdf.metadata.get('Title', ''),
|
|||
|
|
'author': pdf.metadata.get('Author', ''),
|
|||
|
|
'subject': pdf.metadata.get('Subject', ''),
|
|||
|
|
'creator': pdf.metadata.get('Creator', ''),
|
|||
|
|
'producer': pdf.metadata.get('Producer', ''),
|
|||
|
|
'creation_date': pdf.metadata.get('CreationDate', ''),
|
|||
|
|
'modification_date': pdf.metadata.get('ModDate', '')
|
|||
|
|
})
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.warning(f"pdfplumber提取失败,尝试使用PyPDF2: {str(e)}")
|
|||
|
|
|
|||
|
|
# 备用方案:使用PyPDF2
|
|||
|
|
with open(file_path, 'rb') as file:
|
|||
|
|
pdf_reader = PyPDF2.PdfReader(file)
|
|||
|
|
metadata['page_count'] = len(pdf_reader.pages)
|
|||
|
|
|
|||
|
|
for page_num, page in enumerate(pdf_reader.pages, 1):
|
|||
|
|
page_text = page.extract_text()
|
|||
|
|
if page_text:
|
|||
|
|
content_parts.append(f"=== 第 {page_num} 页 ===\n{page_text}\n")
|
|||
|
|
|
|||
|
|
# 获取文档元数据
|
|||
|
|
if pdf_reader.metadata:
|
|||
|
|
metadata.update({
|
|||
|
|
'title': pdf_reader.metadata.get('/Title', ''),
|
|||
|
|
'author': pdf_reader.metadata.get('/Author', ''),
|
|||
|
|
'subject': pdf_reader.metadata.get('/Subject', ''),
|
|||
|
|
'creator': pdf_reader.metadata.get('/Creator', ''),
|
|||
|
|
'producer': pdf_reader.metadata.get('/Producer', ''),
|
|||
|
|
'creation_date': pdf_reader.metadata.get('/CreationDate', ''),
|
|||
|
|
'modification_date': pdf_reader.metadata.get('/ModDate', '')
|
|||
|
|
})
|
|||
|
|
|
|||
|
|
content = '\n'.join(content_parts) if content_parts else ""
|
|||
|
|
return content, metadata
|
|||
|
|
|
|||
|
|
def _extract_docx(self, file_path: Path) -> tuple[str, Dict[str, Any]]:
|
|||
|
|
"""提取DOCX文件的纯文本内容"""
|
|||
|
|
if not DOCX_AVAILABLE:
|
|||
|
|
raise ImportError("需要安装 python-docx: pip install python-docx")
|
|||
|
|
|
|||
|
|
doc = Document(str(file_path))
|
|||
|
|
content_parts = []
|
|||
|
|
metadata = {}
|
|||
|
|
|
|||
|
|
# 提取所有段落文本
|
|||
|
|
for paragraph in doc.paragraphs:
|
|||
|
|
if paragraph.text.strip():
|
|||
|
|
content_parts.append(paragraph.text)
|
|||
|
|
|
|||
|
|
# 提取表格内容
|
|||
|
|
for table in doc.tables:
|
|||
|
|
table_content = []
|
|||
|
|
for row in table.rows:
|
|||
|
|
row_content = []
|
|||
|
|
for cell in row.cells:
|
|||
|
|
row_content.append(cell.text.strip())
|
|||
|
|
table_content.append('\t'.join(row_content))
|
|||
|
|
if table_content:
|
|||
|
|
content_parts.append('\n=== 表格 ===\n' + '\n'.join(table_content) + '\n')
|
|||
|
|
|
|||
|
|
# 获取文档属性
|
|||
|
|
core_props = doc.core_properties
|
|||
|
|
metadata.update({
|
|||
|
|
'title': core_props.title or '',
|
|||
|
|
'author': core_props.author or '',
|
|||
|
|
'subject': core_props.subject or '',
|
|||
|
|
'keywords': core_props.keywords or '',
|
|||
|
|
'comments': core_props.comments or '',
|
|||
|
|
'created': str(core_props.created) if core_props.created else '',
|
|||
|
|
'modified': str(core_props.modified) if core_props.modified else '',
|
|||
|
|
'last_modified_by': core_props.last_modified_by or '',
|
|||
|
|
'paragraph_count': len(doc.paragraphs),
|
|||
|
|
'table_count': len(doc.tables)
|
|||
|
|
})
|
|||
|
|
|
|||
|
|
content = '\n'.join(content_parts)
|
|||
|
|
return content, metadata
|
|||
|
|
|
|||
|
|
def _extract_doc(self, file_path: Path) -> tuple[str, Dict[str, Any]]:
|
|||
|
|
"""提取DOC文件的纯文本内容"""
|
|||
|
|
# DOC格式较复杂,建议转换为DOCX或使用专门的库
|
|||
|
|
logger.warning("DOC格式支持有限,建议转换为DOCX格式")
|
|||
|
|
|
|||
|
|
# 尝试读取为文本文件
|
|||
|
|
try:
|
|||
|
|
with open(file_path, 'r', encoding='utf-8', errors='ignore') as file:
|
|||
|
|
content = file.read()
|
|||
|
|
except:
|
|||
|
|
with open(file_path, 'r', encoding='gbk', errors='ignore') as file:
|
|||
|
|
content = file.read()
|
|||
|
|
|
|||
|
|
metadata = {'format': 'doc', 'encoding_note': '可能存在编码问题'}
|
|||
|
|
return content, metadata
|
|||
|
|
|
|||
|
|
def _extract_txt(self, file_path: Path) -> tuple[str, Dict[str, Any]]:
|
|||
|
|
"""提取TXT/MD文件的纯文本内容"""
|
|||
|
|
encodings = ['utf-8', 'gbk', 'gb2312', 'big5', 'utf-16']
|
|||
|
|
content = ""
|
|||
|
|
used_encoding = ""
|
|||
|
|
|
|||
|
|
for encoding in encodings:
|
|||
|
|
try:
|
|||
|
|
with open(file_path, 'r', encoding=encoding) as file:
|
|||
|
|
content = file.read()
|
|||
|
|
used_encoding = encoding
|
|||
|
|
break
|
|||
|
|
except UnicodeDecodeError:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
if not content:
|
|||
|
|
# 最后尝试忽略错误
|
|||
|
|
with open(file_path, 'r', encoding='utf-8', errors='ignore') as file:
|
|||
|
|
content = file.read()
|
|||
|
|
used_encoding = 'utf-8 (with errors ignored)'
|
|||
|
|
|
|||
|
|
metadata = {
|
|||
|
|
'encoding': used_encoding,
|
|||
|
|
'line_count': len(content.splitlines()),
|
|||
|
|
'char_count': len(content)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return content, metadata
|
|||
|
|
|
|||
|
|
def _extract_xlsx(self, file_path: Path) -> tuple[str, Dict[str, Any]]:
|
|||
|
|
"""提取XLSX文件的纯文本内容"""
|
|||
|
|
if not EXCEL_AVAILABLE:
|
|||
|
|
raise ImportError("需要安装 openpyxl: pip install openpyxl")
|
|||
|
|
|
|||
|
|
workbook = load_workbook(file_path, read_only=True)
|
|||
|
|
content_parts = []
|
|||
|
|
metadata = {
|
|||
|
|
'sheet_count': len(workbook.sheetnames),
|
|||
|
|
'sheet_names': workbook.sheetnames
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
for sheet_name in workbook.sheetnames:
|
|||
|
|
sheet = workbook[sheet_name]
|
|||
|
|
content_parts.append(f"\n=== 工作表: {sheet_name} ===\n")
|
|||
|
|
|
|||
|
|
for row in sheet.iter_rows(values_only=True):
|
|||
|
|
row_content = []
|
|||
|
|
for cell in row:
|
|||
|
|
if cell is not None:
|
|||
|
|
row_content.append(str(cell))
|
|||
|
|
else:
|
|||
|
|
row_content.append("")
|
|||
|
|
if any(cell.strip() for cell in row_content): # 跳过空行
|
|||
|
|
content_parts.append('\t'.join(row_content))
|
|||
|
|
|
|||
|
|
content = '\n'.join(content_parts)
|
|||
|
|
return content, metadata
|
|||
|
|
|
|||
|
|
def _extract_xls(self, file_path: Path) -> tuple[str, Dict[str, Any]]:
|
|||
|
|
"""提取XLS文件的纯文本内容"""
|
|||
|
|
logger.warning("XLS格式支持有限,建议转换为XLSX格式")
|
|||
|
|
|
|||
|
|
# 简单的文本提取
|
|||
|
|
try:
|
|||
|
|
with open(file_path, 'rb') as file:
|
|||
|
|
content = file.read().decode('utf-8', errors='ignore')
|
|||
|
|
except:
|
|||
|
|
content = f"无法读取XLS文件: {file_path}"
|
|||
|
|
|
|||
|
|
metadata = {'format': 'xls', 'note': '可能存在格式问题'}
|
|||
|
|
return content, metadata
|
|||
|
|
|
|||
|
|
def _extract_csv(self, file_path: Path) -> tuple[str, Dict[str, Any]]:
|
|||
|
|
"""提取CSV文件的纯文本内容"""
|
|||
|
|
encodings = ['utf-8', 'gbk', 'gb2312']
|
|||
|
|
content = ""
|
|||
|
|
used_encoding = ""
|
|||
|
|
|
|||
|
|
for encoding in encodings:
|
|||
|
|
try:
|
|||
|
|
with open(file_path, 'r', encoding=encoding) as file:
|
|||
|
|
content = file.read()
|
|||
|
|
used_encoding = encoding
|
|||
|
|
break
|
|||
|
|
except UnicodeDecodeError:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
if not content:
|
|||
|
|
with open(file_path, 'r', encoding='utf-8', errors='ignore') as file:
|
|||
|
|
content = file.read()
|
|||
|
|
used_encoding = 'utf-8 (with errors ignored)'
|
|||
|
|
|
|||
|
|
# 计算行数和列数
|
|||
|
|
lines = content.splitlines()
|
|||
|
|
row_count = len(lines)
|
|||
|
|
col_count = len(lines[0].split(',')) if lines else 0
|
|||
|
|
|
|||
|
|
metadata = {
|
|||
|
|
'encoding': used_encoding,
|
|||
|
|
'row_count': row_count,
|
|||
|
|
'estimated_col_count': col_count
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return content, metadata
|
|||
|
|
|
|||
|
|
def get_supported_formats(self) -> List[str]:
|
|||
|
|
"""获取支持的文件格式列表"""
|
|||
|
|
return list(self.supported_formats.keys())
|
|||
|
|
|
|||
|
|
def is_supported(self, file_path: str) -> bool:
|
|||
|
|
"""检查文件格式是否支持"""
|
|||
|
|
return Path(file_path).suffix.lower() in self.supported_formats
|