330 lines
11 KiB
Python
Raw Permalink Normal View History

2025-07-31 15:35:23 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Document Processing Router
文档处理路由 - API v2
"""
import logging
from typing import Dict, Any
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import JSONResponse
from ..models import (
DocumentProcessingRequest,
DocumentProcessingResponse,
ApiResponse
)
logger = logging.getLogger(__name__)
router = APIRouter()
@router.post("/process", response_model=DocumentProcessingResponse, summary="处理文档")
async def process_documents(
request: DocumentProcessingRequest,
pipeline: Dict[str, Any] = Depends(__import__('api_v2.main', fromlist=['get_document_pipeline']).get_document_pipeline)
):
"""
处理文档
- **action**: 处理动作 (extract: 提取, integrate: 整合, transform: 转换)
- **document_paths**: 文档路径列表
- **format_type**: 转换格式类型transform动作时需要
- **additional_requirements**: 额外要求
"""
try:
logger.info(f"开始处理文档,动作: {request.action}, 文档数量: {len(request.document_paths)}")
# 获取文档处理器
document_processor = pipeline["document_processor"]
if request.action == "extract":
# 提取文档内容
request_id, result_data = await document_processor.extract_documents(
document_paths=request.document_paths
)
elif request.action == "integrate":
# 整合文档内容
request_id, result_data = await document_processor.integrate_documents(
document_paths=request.document_paths
)
elif request.action == "transform":
# 转换文档格式
if not request.format_type:
raise ValueError("转换操作需要指定format_type")
request_id, result_data = await document_processor.transform_documents(
document_paths=request.document_paths,
format_type=request.format_type,
additional_requirements=request.additional_requirements
)
else:
raise ValueError(f"不支持的操作: {request.action}")
logger.info(f"文档处理完成请求ID: {request_id}")
return DocumentProcessingResponse(
success=True,
message=f"文档{request.action}处理成功",
data=result_data,
request_id=request_id
)
except Exception as e:
error_msg = f"文档处理失败: {str(e)}"
logger.error(error_msg, exc_info=True)
return JSONResponse(
status_code=500,
content=DocumentProcessingResponse(
success=False,
message="文档处理失败",
error=error_msg
).dict()
)
@router.post("/extract", response_model=DocumentProcessingResponse, summary="提取文档内容")
async def extract_documents(
document_paths: list[str],
pipeline: Dict[str, Any] = Depends(__import__('api_v2.main', fromlist=['get_document_pipeline']).get_document_pipeline)
):
"""
提取文档内容
- **document_paths**: 文档路径列表
"""
try:
logger.info(f"开始提取文档内容,文档数量: {len(document_paths)}")
# 获取文本提取器
text_extractor = pipeline["text_extractor"]
# 提取所有文档
extracted_docs = []
for doc_path in document_paths:
extracted_doc = text_extractor.extract_text(doc_path)
if extracted_doc:
extracted_docs.append(extracted_doc.to_dict())
result_data = {
"extracted_documents": extracted_docs,
"total_processed": len(extracted_docs),
"success_rate": len(extracted_docs) / len(document_paths) if document_paths else 0
}
logger.info(f"文档提取完成,成功处理 {len(extracted_docs)} 个文档")
return DocumentProcessingResponse(
success=True,
message=f"成功提取 {len(extracted_docs)} 个文档",
data=result_data
)
except Exception as e:
error_msg = f"文档提取失败: {str(e)}"
logger.error(error_msg, exc_info=True)
return JSONResponse(
status_code=500,
content=DocumentProcessingResponse(
success=False,
message="文档提取失败",
error=error_msg
).dict()
)
@router.post("/integrate", response_model=DocumentProcessingResponse, summary="整合文档内容")
async def integrate_documents(
document_paths: list[str],
pipeline: Dict[str, Any] = Depends(__import__('api_v2.main', fromlist=['get_document_pipeline']).get_document_pipeline)
):
"""
整合文档内容
- **document_paths**: 文档路径列表
"""
try:
logger.info(f"开始整合文档内容,文档数量: {len(document_paths)}")
# 获取内容整合器
content_integrator = pipeline["content_integrator"]
# 先提取文档内容
text_extractor = pipeline["text_extractor"]
extracted_docs = []
for doc_path in document_paths:
extracted_doc = text_extractor.extract_text(doc_path)
if extracted_doc:
extracted_docs.append(extracted_doc)
# 整合内容
integrated_content = content_integrator.integrate_content(extracted_docs)
result_data = {
"integrated_content": integrated_content.to_dict() if integrated_content else None,
"source_documents": len(extracted_docs),
"integration_stats": integrated_content.statistics if integrated_content else {}
}
logger.info(f"文档整合完成,整合了 {len(extracted_docs)} 个文档")
return DocumentProcessingResponse(
success=True,
message=f"成功整合 {len(extracted_docs)} 个文档",
data=result_data
)
except Exception as e:
error_msg = f"文档整合失败: {str(e)}"
logger.error(error_msg, exc_info=True)
return JSONResponse(
status_code=500,
content=DocumentProcessingResponse(
success=False,
message="文档整合失败",
error=error_msg
).dict()
)
@router.post("/transform", response_model=DocumentProcessingResponse, summary="转换文档格式")
async def transform_documents(
document_paths: list[str],
format_type: str,
additional_requirements: str = "",
pipeline: Dict[str, Any] = Depends(__import__('api_v2.main', fromlist=['get_document_pipeline']).get_document_pipeline)
):
"""
转换文档格式
- **document_paths**: 文档路径列表
- **format_type**: 目标格式类型
- **additional_requirements**: 额外要求
"""
try:
logger.info(f"开始转换文档格式,目标格式: {format_type}")
# 获取处理器组件
document_processor = pipeline["document_processor"]
# 执行完整的处理流程:提取 -> 整合 -> 转换
request_id, result_data = await document_processor.process_documents(
document_paths=document_paths,
format_type=format_type,
additional_requirements=additional_requirements
)
logger.info(f"文档转换完成请求ID: {request_id}")
return DocumentProcessingResponse(
success=True,
message=f"文档转换为{format_type}格式成功",
data=result_data,
request_id=request_id
)
except Exception as e:
error_msg = f"文档转换失败: {str(e)}"
logger.error(error_msg, exc_info=True)
return JSONResponse(
status_code=500,
content=DocumentProcessingResponse(
success=False,
message="文档转换失败",
error=error_msg
).dict()
)
@router.get("/formats", response_model=ApiResponse, summary="获取支持的格式")
async def get_supported_formats():
"""获取支持的文档格式和转换类型"""
try:
supported_data = {
"input_formats": [
".pdf", ".docx", ".xlsx", ".txt", ".csv",
".json", ".xml", ".html", ".md"
],
"output_formats": [
"attraction_standard",
"product_sales",
"travel_guide",
"blog_post",
"summary",
"structured_data",
"marketing_copy",
"faq"
],
"format_descriptions": {
"attraction_standard": "景点标准化格式",
"product_sales": "产品销售格式",
"travel_guide": "旅游攻略格式",
"blog_post": "博客文章格式",
"summary": "摘要格式",
"structured_data": "结构化数据格式",
"marketing_copy": "营销文案格式",
"faq": "常见问题格式"
}
}
return ApiResponse(
success=True,
message="获取支持格式成功",
data=supported_data
)
except Exception as e:
error_msg = f"获取支持格式失败: {str(e)}"
logger.error(error_msg, exc_info=True)
return JSONResponse(
status_code=500,
content=ApiResponse(
success=False,
message="获取支持格式失败",
error=error_msg
).dict()
)
@router.get("/pipeline/stats", response_model=ApiResponse, summary="获取流水线统计")
async def get_pipeline_stats(
pipeline: Dict[str, Any] = Depends(__import__('api_v2.main', fromlist=['get_document_pipeline']).get_document_pipeline)
):
"""获取文档处理流水线的统计信息"""
try:
stats = {
"text_extractor": pipeline["text_extractor"].get_extractor_stats(),
"content_integrator": pipeline["content_integrator"].get_integrator_stats(),
"content_transformer": pipeline["content_transformer"].get_transformer_stats(),
"config": pipeline["config"].document_processing.dict()
}
return ApiResponse(
success=True,
message="统计信息获取成功",
data=stats
)
except Exception as e:
error_msg = f"获取统计信息失败: {str(e)}"
logger.error(error_msg, exc_info=True)
return JSONResponse(
status_code=500,
content=ApiResponse(
success=False,
message="获取统计信息失败",
error=error_msg
).dict()
)