330 lines
11 KiB
Python
330 lines
11 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
"""
|
||
Document Processing Router
|
||
文档处理路由 - API v2
|
||
"""
|
||
|
||
import logging
|
||
from typing import Dict, Any
|
||
from fastapi import APIRouter, Depends, HTTPException
|
||
from fastapi.responses import JSONResponse
|
||
|
||
from ..models import (
|
||
DocumentProcessingRequest,
|
||
DocumentProcessingResponse,
|
||
ApiResponse
|
||
)
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
router = APIRouter()
|
||
|
||
|
||
@router.post("/process", response_model=DocumentProcessingResponse, summary="处理文档")
|
||
async def process_documents(
|
||
request: DocumentProcessingRequest,
|
||
pipeline: Dict[str, Any] = Depends(__import__('api_v2.main', fromlist=['get_document_pipeline']).get_document_pipeline)
|
||
):
|
||
"""
|
||
处理文档
|
||
|
||
- **action**: 处理动作 (extract: 提取, integrate: 整合, transform: 转换)
|
||
- **document_paths**: 文档路径列表
|
||
- **format_type**: 转换格式类型(transform动作时需要)
|
||
- **additional_requirements**: 额外要求
|
||
"""
|
||
try:
|
||
logger.info(f"开始处理文档,动作: {request.action}, 文档数量: {len(request.document_paths)}")
|
||
|
||
# 获取文档处理器
|
||
document_processor = pipeline["document_processor"]
|
||
|
||
if request.action == "extract":
|
||
# 提取文档内容
|
||
request_id, result_data = await document_processor.extract_documents(
|
||
document_paths=request.document_paths
|
||
)
|
||
|
||
elif request.action == "integrate":
|
||
# 整合文档内容
|
||
request_id, result_data = await document_processor.integrate_documents(
|
||
document_paths=request.document_paths
|
||
)
|
||
|
||
elif request.action == "transform":
|
||
# 转换文档格式
|
||
if not request.format_type:
|
||
raise ValueError("转换操作需要指定format_type")
|
||
|
||
request_id, result_data = await document_processor.transform_documents(
|
||
document_paths=request.document_paths,
|
||
format_type=request.format_type,
|
||
additional_requirements=request.additional_requirements
|
||
)
|
||
|
||
else:
|
||
raise ValueError(f"不支持的操作: {request.action}")
|
||
|
||
logger.info(f"文档处理完成,请求ID: {request_id}")
|
||
|
||
return DocumentProcessingResponse(
|
||
success=True,
|
||
message=f"文档{request.action}处理成功",
|
||
data=result_data,
|
||
request_id=request_id
|
||
)
|
||
|
||
except Exception as e:
|
||
error_msg = f"文档处理失败: {str(e)}"
|
||
logger.error(error_msg, exc_info=True)
|
||
|
||
return JSONResponse(
|
||
status_code=500,
|
||
content=DocumentProcessingResponse(
|
||
success=False,
|
||
message="文档处理失败",
|
||
error=error_msg
|
||
).dict()
|
||
)
|
||
|
||
|
||
@router.post("/extract", response_model=DocumentProcessingResponse, summary="提取文档内容")
|
||
async def extract_documents(
|
||
document_paths: list[str],
|
||
pipeline: Dict[str, Any] = Depends(__import__('api_v2.main', fromlist=['get_document_pipeline']).get_document_pipeline)
|
||
):
|
||
"""
|
||
提取文档内容
|
||
|
||
- **document_paths**: 文档路径列表
|
||
"""
|
||
try:
|
||
logger.info(f"开始提取文档内容,文档数量: {len(document_paths)}")
|
||
|
||
# 获取文本提取器
|
||
text_extractor = pipeline["text_extractor"]
|
||
|
||
# 提取所有文档
|
||
extracted_docs = []
|
||
for doc_path in document_paths:
|
||
extracted_doc = text_extractor.extract_text(doc_path)
|
||
if extracted_doc:
|
||
extracted_docs.append(extracted_doc.to_dict())
|
||
|
||
result_data = {
|
||
"extracted_documents": extracted_docs,
|
||
"total_processed": len(extracted_docs),
|
||
"success_rate": len(extracted_docs) / len(document_paths) if document_paths else 0
|
||
}
|
||
|
||
logger.info(f"文档提取完成,成功处理 {len(extracted_docs)} 个文档")
|
||
|
||
return DocumentProcessingResponse(
|
||
success=True,
|
||
message=f"成功提取 {len(extracted_docs)} 个文档",
|
||
data=result_data
|
||
)
|
||
|
||
except Exception as e:
|
||
error_msg = f"文档提取失败: {str(e)}"
|
||
logger.error(error_msg, exc_info=True)
|
||
|
||
return JSONResponse(
|
||
status_code=500,
|
||
content=DocumentProcessingResponse(
|
||
success=False,
|
||
message="文档提取失败",
|
||
error=error_msg
|
||
).dict()
|
||
)
|
||
|
||
|
||
@router.post("/integrate", response_model=DocumentProcessingResponse, summary="整合文档内容")
|
||
async def integrate_documents(
|
||
document_paths: list[str],
|
||
pipeline: Dict[str, Any] = Depends(__import__('api_v2.main', fromlist=['get_document_pipeline']).get_document_pipeline)
|
||
):
|
||
"""
|
||
整合文档内容
|
||
|
||
- **document_paths**: 文档路径列表
|
||
"""
|
||
try:
|
||
logger.info(f"开始整合文档内容,文档数量: {len(document_paths)}")
|
||
|
||
# 获取内容整合器
|
||
content_integrator = pipeline["content_integrator"]
|
||
|
||
# 先提取文档内容
|
||
text_extractor = pipeline["text_extractor"]
|
||
extracted_docs = []
|
||
for doc_path in document_paths:
|
||
extracted_doc = text_extractor.extract_text(doc_path)
|
||
if extracted_doc:
|
||
extracted_docs.append(extracted_doc)
|
||
|
||
# 整合内容
|
||
integrated_content = content_integrator.integrate_content(extracted_docs)
|
||
|
||
result_data = {
|
||
"integrated_content": integrated_content.to_dict() if integrated_content else None,
|
||
"source_documents": len(extracted_docs),
|
||
"integration_stats": integrated_content.statistics if integrated_content else {}
|
||
}
|
||
|
||
logger.info(f"文档整合完成,整合了 {len(extracted_docs)} 个文档")
|
||
|
||
return DocumentProcessingResponse(
|
||
success=True,
|
||
message=f"成功整合 {len(extracted_docs)} 个文档",
|
||
data=result_data
|
||
)
|
||
|
||
except Exception as e:
|
||
error_msg = f"文档整合失败: {str(e)}"
|
||
logger.error(error_msg, exc_info=True)
|
||
|
||
return JSONResponse(
|
||
status_code=500,
|
||
content=DocumentProcessingResponse(
|
||
success=False,
|
||
message="文档整合失败",
|
||
error=error_msg
|
||
).dict()
|
||
)
|
||
|
||
|
||
@router.post("/transform", response_model=DocumentProcessingResponse, summary="转换文档格式")
|
||
async def transform_documents(
|
||
document_paths: list[str],
|
||
format_type: str,
|
||
additional_requirements: str = "",
|
||
pipeline: Dict[str, Any] = Depends(__import__('api_v2.main', fromlist=['get_document_pipeline']).get_document_pipeline)
|
||
):
|
||
"""
|
||
转换文档格式
|
||
|
||
- **document_paths**: 文档路径列表
|
||
- **format_type**: 目标格式类型
|
||
- **additional_requirements**: 额外要求
|
||
"""
|
||
try:
|
||
logger.info(f"开始转换文档格式,目标格式: {format_type}")
|
||
|
||
# 获取处理器组件
|
||
document_processor = pipeline["document_processor"]
|
||
|
||
# 执行完整的处理流程:提取 -> 整合 -> 转换
|
||
request_id, result_data = await document_processor.process_documents(
|
||
document_paths=document_paths,
|
||
format_type=format_type,
|
||
additional_requirements=additional_requirements
|
||
)
|
||
|
||
logger.info(f"文档转换完成,请求ID: {request_id}")
|
||
|
||
return DocumentProcessingResponse(
|
||
success=True,
|
||
message=f"文档转换为{format_type}格式成功",
|
||
data=result_data,
|
||
request_id=request_id
|
||
)
|
||
|
||
except Exception as e:
|
||
error_msg = f"文档转换失败: {str(e)}"
|
||
logger.error(error_msg, exc_info=True)
|
||
|
||
return JSONResponse(
|
||
status_code=500,
|
||
content=DocumentProcessingResponse(
|
||
success=False,
|
||
message="文档转换失败",
|
||
error=error_msg
|
||
).dict()
|
||
)
|
||
|
||
|
||
@router.get("/formats", response_model=ApiResponse, summary="获取支持的格式")
|
||
async def get_supported_formats():
|
||
"""获取支持的文档格式和转换类型"""
|
||
try:
|
||
supported_data = {
|
||
"input_formats": [
|
||
".pdf", ".docx", ".xlsx", ".txt", ".csv",
|
||
".json", ".xml", ".html", ".md"
|
||
],
|
||
"output_formats": [
|
||
"attraction_standard",
|
||
"product_sales",
|
||
"travel_guide",
|
||
"blog_post",
|
||
"summary",
|
||
"structured_data",
|
||
"marketing_copy",
|
||
"faq"
|
||
],
|
||
"format_descriptions": {
|
||
"attraction_standard": "景点标准化格式",
|
||
"product_sales": "产品销售格式",
|
||
"travel_guide": "旅游攻略格式",
|
||
"blog_post": "博客文章格式",
|
||
"summary": "摘要格式",
|
||
"structured_data": "结构化数据格式",
|
||
"marketing_copy": "营销文案格式",
|
||
"faq": "常见问题格式"
|
||
}
|
||
}
|
||
|
||
return ApiResponse(
|
||
success=True,
|
||
message="获取支持格式成功",
|
||
data=supported_data
|
||
)
|
||
|
||
except Exception as e:
|
||
error_msg = f"获取支持格式失败: {str(e)}"
|
||
logger.error(error_msg, exc_info=True)
|
||
|
||
return JSONResponse(
|
||
status_code=500,
|
||
content=ApiResponse(
|
||
success=False,
|
||
message="获取支持格式失败",
|
||
error=error_msg
|
||
).dict()
|
||
)
|
||
|
||
|
||
@router.get("/pipeline/stats", response_model=ApiResponse, summary="获取流水线统计")
|
||
async def get_pipeline_stats(
|
||
pipeline: Dict[str, Any] = Depends(__import__('api_v2.main', fromlist=['get_document_pipeline']).get_document_pipeline)
|
||
):
|
||
"""获取文档处理流水线的统计信息"""
|
||
try:
|
||
stats = {
|
||
"text_extractor": pipeline["text_extractor"].get_extractor_stats(),
|
||
"content_integrator": pipeline["content_integrator"].get_integrator_stats(),
|
||
"content_transformer": pipeline["content_transformer"].get_transformer_stats(),
|
||
"config": pipeline["config"].document_processing.dict()
|
||
}
|
||
|
||
return ApiResponse(
|
||
success=True,
|
||
message="统计信息获取成功",
|
||
data=stats
|
||
)
|
||
|
||
except Exception as e:
|
||
error_msg = f"获取统计信息失败: {str(e)}"
|
||
logger.error(error_msg, exc_info=True)
|
||
|
||
return JSONResponse(
|
||
status_code=500,
|
||
content=ApiResponse(
|
||
success=False,
|
||
message="获取统计信息失败",
|
||
error=error_msg
|
||
).dict()
|
||
) |