#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 文本提取器模块 支持从PDF、Word、TXT等格式的文档中提取文本内容 """ import os import logging from typing import List, Dict, Any, Optional from pathlib import Path from dataclasses import dataclass from datetime import datetime # 导入依赖库 try: import PyPDF2 import pdfplumber PDF_AVAILABLE = True except ImportError: PDF_AVAILABLE = False try: from docx import Document DOCX_AVAILABLE = True except ImportError: DOCX_AVAILABLE = False try: import openpyxl from openpyxl import load_workbook EXCEL_AVAILABLE = True except ImportError: EXCEL_AVAILABLE = False logger = logging.getLogger(__name__) @dataclass class ExtractedDocument: """提取的文档数据""" filename: str file_type: str content: str # 纯文本内容 metadata: Dict[str, Any] # 文档元数据 extracted_at: datetime file_size: int page_count: Optional[int] = None def __post_init__(self): # 确保content是字符串 if not isinstance(self.content, str): self.content = str(self.content) class TextExtractor: """文本提取器 - 只做纯文本提取,保留所有原始内容""" def __init__(self, config: Optional[Dict[str, Any]] = None): self.config = config or {} self.supported_formats = { '.pdf': self._extract_pdf, '.docx': self._extract_docx, '.doc': self._extract_doc, '.txt': self._extract_txt, '.md': self._extract_txt, '.xlsx': self._extract_xlsx, '.xls': self._extract_xls, '.csv': self._extract_csv } def extract(self, file_path: str) -> ExtractedDocument: """提取单个文件的文本内容""" path_obj = Path(file_path) if not path_obj.exists(): raise FileNotFoundError(f"文件不存在: {file_path}") file_ext = path_obj.suffix.lower() if file_ext not in self.supported_formats: raise ValueError(f"不支持的文件格式: {file_ext}") try: # 获取文件信息 file_size = path_obj.stat().st_size # 提取文本内容 extractor = self.supported_formats[file_ext] content, metadata = extractor(path_obj) return ExtractedDocument( filename=path_obj.name, file_type=file_ext, content=content, metadata=metadata, extracted_at=datetime.now(), file_size=file_size, page_count=metadata.get('page_count') ) except Exception as e: logger.error(f"提取文件 {file_path} 时出错: {str(e)}") raise def extract_batch(self, file_paths: List[str]) -> List[ExtractedDocument]: """批量提取多个文件的文本内容""" results = [] for file_path in file_paths: try: result = self.extract(file_path) results.append(result) logger.info(f"成功提取文件: {file_path}") except Exception as e: logger.error(f"提取文件 {file_path} 失败: {str(e)}") # 创建错误记录 error_doc = ExtractedDocument( filename=Path(file_path).name, file_type=Path(file_path).suffix.lower(), content=f"提取失败: {str(e)}", metadata={"error": str(e)}, extracted_at=datetime.now(), file_size=0 ) results.append(error_doc) return results def _extract_pdf(self, file_path: Path) -> tuple[str, Dict[str, Any]]: """提取PDF文件的纯文本内容""" if not PDF_AVAILABLE: raise ImportError("需要安装 PyPDF2 和 pdfplumber: pip install PyPDF2 pdfplumber") content_parts = [] metadata = {} try: # 使用pdfplumber提取文本(更好的文本提取) with pdfplumber.open(file_path) as pdf: metadata['page_count'] = len(pdf.pages) for page_num, page in enumerate(pdf.pages, 1): page_text = page.extract_text() if page_text: content_parts.append(f"=== 第 {page_num} 页 ===\n{page_text}\n") # 获取文档元数据 if pdf.metadata: metadata.update({ 'title': pdf.metadata.get('Title', ''), 'author': pdf.metadata.get('Author', ''), 'subject': pdf.metadata.get('Subject', ''), 'creator': pdf.metadata.get('Creator', ''), 'producer': pdf.metadata.get('Producer', ''), 'creation_date': pdf.metadata.get('CreationDate', ''), 'modification_date': pdf.metadata.get('ModDate', '') }) except Exception as e: logger.warning(f"pdfplumber提取失败,尝试使用PyPDF2: {str(e)}") # 备用方案:使用PyPDF2 with open(file_path, 'rb') as file: pdf_reader = PyPDF2.PdfReader(file) metadata['page_count'] = len(pdf_reader.pages) for page_num, page in enumerate(pdf_reader.pages, 1): page_text = page.extract_text() if page_text: content_parts.append(f"=== 第 {page_num} 页 ===\n{page_text}\n") # 获取文档元数据 if pdf_reader.metadata: metadata.update({ 'title': pdf_reader.metadata.get('/Title', ''), 'author': pdf_reader.metadata.get('/Author', ''), 'subject': pdf_reader.metadata.get('/Subject', ''), 'creator': pdf_reader.metadata.get('/Creator', ''), 'producer': pdf_reader.metadata.get('/Producer', ''), 'creation_date': pdf_reader.metadata.get('/CreationDate', ''), 'modification_date': pdf_reader.metadata.get('/ModDate', '') }) content = '\n'.join(content_parts) if content_parts else "" return content, metadata def _extract_docx(self, file_path: Path) -> tuple[str, Dict[str, Any]]: """提取DOCX文件的纯文本内容""" if not DOCX_AVAILABLE: raise ImportError("需要安装 python-docx: pip install python-docx") doc = Document(str(file_path)) content_parts = [] metadata = {} # 提取所有段落文本 for paragraph in doc.paragraphs: if paragraph.text.strip(): content_parts.append(paragraph.text) # 提取表格内容 for table in doc.tables: table_content = [] for row in table.rows: row_content = [] for cell in row.cells: row_content.append(cell.text.strip()) table_content.append('\t'.join(row_content)) if table_content: content_parts.append('\n=== 表格 ===\n' + '\n'.join(table_content) + '\n') # 获取文档属性 core_props = doc.core_properties metadata.update({ 'title': core_props.title or '', 'author': core_props.author or '', 'subject': core_props.subject or '', 'keywords': core_props.keywords or '', 'comments': core_props.comments or '', 'created': str(core_props.created) if core_props.created else '', 'modified': str(core_props.modified) if core_props.modified else '', 'last_modified_by': core_props.last_modified_by or '', 'paragraph_count': len(doc.paragraphs), 'table_count': len(doc.tables) }) content = '\n'.join(content_parts) return content, metadata def _extract_doc(self, file_path: Path) -> tuple[str, Dict[str, Any]]: """提取DOC文件的纯文本内容""" # DOC格式较复杂,建议转换为DOCX或使用专门的库 logger.warning("DOC格式支持有限,建议转换为DOCX格式") # 尝试读取为文本文件 try: with open(file_path, 'r', encoding='utf-8', errors='ignore') as file: content = file.read() except: with open(file_path, 'r', encoding='gbk', errors='ignore') as file: content = file.read() metadata = {'format': 'doc', 'encoding_note': '可能存在编码问题'} return content, metadata def _extract_txt(self, file_path: Path) -> tuple[str, Dict[str, Any]]: """提取TXT/MD文件的纯文本内容""" encodings = ['utf-8', 'gbk', 'gb2312', 'big5', 'utf-16'] content = "" used_encoding = "" for encoding in encodings: try: with open(file_path, 'r', encoding=encoding) as file: content = file.read() used_encoding = encoding break except UnicodeDecodeError: continue if not content: # 最后尝试忽略错误 with open(file_path, 'r', encoding='utf-8', errors='ignore') as file: content = file.read() used_encoding = 'utf-8 (with errors ignored)' metadata = { 'encoding': used_encoding, 'line_count': len(content.splitlines()), 'char_count': len(content) } return content, metadata def _extract_xlsx(self, file_path: Path) -> tuple[str, Dict[str, Any]]: """提取XLSX文件的纯文本内容""" if not EXCEL_AVAILABLE: raise ImportError("需要安装 openpyxl: pip install openpyxl") workbook = load_workbook(file_path, read_only=True) content_parts = [] metadata = { 'sheet_count': len(workbook.sheetnames), 'sheet_names': workbook.sheetnames } for sheet_name in workbook.sheetnames: sheet = workbook[sheet_name] content_parts.append(f"\n=== 工作表: {sheet_name} ===\n") for row in sheet.iter_rows(values_only=True): row_content = [] for cell in row: if cell is not None: row_content.append(str(cell)) else: row_content.append("") if any(cell.strip() for cell in row_content): # 跳过空行 content_parts.append('\t'.join(row_content)) content = '\n'.join(content_parts) return content, metadata def _extract_xls(self, file_path: Path) -> tuple[str, Dict[str, Any]]: """提取XLS文件的纯文本内容""" logger.warning("XLS格式支持有限,建议转换为XLSX格式") # 简单的文本提取 try: with open(file_path, 'rb') as file: content = file.read().decode('utf-8', errors='ignore') except: content = f"无法读取XLS文件: {file_path}" metadata = {'format': 'xls', 'note': '可能存在格式问题'} return content, metadata def _extract_csv(self, file_path: Path) -> tuple[str, Dict[str, Any]]: """提取CSV文件的纯文本内容""" encodings = ['utf-8', 'gbk', 'gb2312'] content = "" used_encoding = "" for encoding in encodings: try: with open(file_path, 'r', encoding=encoding) as file: content = file.read() used_encoding = encoding break except UnicodeDecodeError: continue if not content: with open(file_path, 'r', encoding='utf-8', errors='ignore') as file: content = file.read() used_encoding = 'utf-8 (with errors ignored)' # 计算行数和列数 lines = content.splitlines() row_count = len(lines) col_count = len(lines[0].split(',')) if lines else 0 metadata = { 'encoding': used_encoding, 'row_count': row_count, 'estimated_col_count': col_count } return content, metadata def get_supported_formats(self) -> List[str]: """获取支持的文件格式列表""" return list(self.supported_formats.keys()) def is_supported(self, file_path: str) -> bool: """检查文件格式是否支持""" return Path(file_path).suffix.lower() in self.supported_formats