2025-07-31 15:35:23 +08:00

330 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Document Processing Router
文档处理路由 - API v2
"""
import logging
from typing import Dict, Any
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import JSONResponse
from ..models import (
DocumentProcessingRequest,
DocumentProcessingResponse,
ApiResponse
)
logger = logging.getLogger(__name__)
router = APIRouter()
@router.post("/process", response_model=DocumentProcessingResponse, summary="处理文档")
async def process_documents(
request: DocumentProcessingRequest,
pipeline: Dict[str, Any] = Depends(__import__('api_v2.main', fromlist=['get_document_pipeline']).get_document_pipeline)
):
"""
处理文档
- **action**: 处理动作 (extract: 提取, integrate: 整合, transform: 转换)
- **document_paths**: 文档路径列表
- **format_type**: 转换格式类型transform动作时需要
- **additional_requirements**: 额外要求
"""
try:
logger.info(f"开始处理文档,动作: {request.action}, 文档数量: {len(request.document_paths)}")
# 获取文档处理器
document_processor = pipeline["document_processor"]
if request.action == "extract":
# 提取文档内容
request_id, result_data = await document_processor.extract_documents(
document_paths=request.document_paths
)
elif request.action == "integrate":
# 整合文档内容
request_id, result_data = await document_processor.integrate_documents(
document_paths=request.document_paths
)
elif request.action == "transform":
# 转换文档格式
if not request.format_type:
raise ValueError("转换操作需要指定format_type")
request_id, result_data = await document_processor.transform_documents(
document_paths=request.document_paths,
format_type=request.format_type,
additional_requirements=request.additional_requirements
)
else:
raise ValueError(f"不支持的操作: {request.action}")
logger.info(f"文档处理完成请求ID: {request_id}")
return DocumentProcessingResponse(
success=True,
message=f"文档{request.action}处理成功",
data=result_data,
request_id=request_id
)
except Exception as e:
error_msg = f"文档处理失败: {str(e)}"
logger.error(error_msg, exc_info=True)
return JSONResponse(
status_code=500,
content=DocumentProcessingResponse(
success=False,
message="文档处理失败",
error=error_msg
).dict()
)
@router.post("/extract", response_model=DocumentProcessingResponse, summary="提取文档内容")
async def extract_documents(
document_paths: list[str],
pipeline: Dict[str, Any] = Depends(__import__('api_v2.main', fromlist=['get_document_pipeline']).get_document_pipeline)
):
"""
提取文档内容
- **document_paths**: 文档路径列表
"""
try:
logger.info(f"开始提取文档内容,文档数量: {len(document_paths)}")
# 获取文本提取器
text_extractor = pipeline["text_extractor"]
# 提取所有文档
extracted_docs = []
for doc_path in document_paths:
extracted_doc = text_extractor.extract_text(doc_path)
if extracted_doc:
extracted_docs.append(extracted_doc.to_dict())
result_data = {
"extracted_documents": extracted_docs,
"total_processed": len(extracted_docs),
"success_rate": len(extracted_docs) / len(document_paths) if document_paths else 0
}
logger.info(f"文档提取完成,成功处理 {len(extracted_docs)} 个文档")
return DocumentProcessingResponse(
success=True,
message=f"成功提取 {len(extracted_docs)} 个文档",
data=result_data
)
except Exception as e:
error_msg = f"文档提取失败: {str(e)}"
logger.error(error_msg, exc_info=True)
return JSONResponse(
status_code=500,
content=DocumentProcessingResponse(
success=False,
message="文档提取失败",
error=error_msg
).dict()
)
@router.post("/integrate", response_model=DocumentProcessingResponse, summary="整合文档内容")
async def integrate_documents(
document_paths: list[str],
pipeline: Dict[str, Any] = Depends(__import__('api_v2.main', fromlist=['get_document_pipeline']).get_document_pipeline)
):
"""
整合文档内容
- **document_paths**: 文档路径列表
"""
try:
logger.info(f"开始整合文档内容,文档数量: {len(document_paths)}")
# 获取内容整合器
content_integrator = pipeline["content_integrator"]
# 先提取文档内容
text_extractor = pipeline["text_extractor"]
extracted_docs = []
for doc_path in document_paths:
extracted_doc = text_extractor.extract_text(doc_path)
if extracted_doc:
extracted_docs.append(extracted_doc)
# 整合内容
integrated_content = content_integrator.integrate_content(extracted_docs)
result_data = {
"integrated_content": integrated_content.to_dict() if integrated_content else None,
"source_documents": len(extracted_docs),
"integration_stats": integrated_content.statistics if integrated_content else {}
}
logger.info(f"文档整合完成,整合了 {len(extracted_docs)} 个文档")
return DocumentProcessingResponse(
success=True,
message=f"成功整合 {len(extracted_docs)} 个文档",
data=result_data
)
except Exception as e:
error_msg = f"文档整合失败: {str(e)}"
logger.error(error_msg, exc_info=True)
return JSONResponse(
status_code=500,
content=DocumentProcessingResponse(
success=False,
message="文档整合失败",
error=error_msg
).dict()
)
@router.post("/transform", response_model=DocumentProcessingResponse, summary="转换文档格式")
async def transform_documents(
document_paths: list[str],
format_type: str,
additional_requirements: str = "",
pipeline: Dict[str, Any] = Depends(__import__('api_v2.main', fromlist=['get_document_pipeline']).get_document_pipeline)
):
"""
转换文档格式
- **document_paths**: 文档路径列表
- **format_type**: 目标格式类型
- **additional_requirements**: 额外要求
"""
try:
logger.info(f"开始转换文档格式,目标格式: {format_type}")
# 获取处理器组件
document_processor = pipeline["document_processor"]
# 执行完整的处理流程:提取 -> 整合 -> 转换
request_id, result_data = await document_processor.process_documents(
document_paths=document_paths,
format_type=format_type,
additional_requirements=additional_requirements
)
logger.info(f"文档转换完成请求ID: {request_id}")
return DocumentProcessingResponse(
success=True,
message=f"文档转换为{format_type}格式成功",
data=result_data,
request_id=request_id
)
except Exception as e:
error_msg = f"文档转换失败: {str(e)}"
logger.error(error_msg, exc_info=True)
return JSONResponse(
status_code=500,
content=DocumentProcessingResponse(
success=False,
message="文档转换失败",
error=error_msg
).dict()
)
@router.get("/formats", response_model=ApiResponse, summary="获取支持的格式")
async def get_supported_formats():
"""获取支持的文档格式和转换类型"""
try:
supported_data = {
"input_formats": [
".pdf", ".docx", ".xlsx", ".txt", ".csv",
".json", ".xml", ".html", ".md"
],
"output_formats": [
"attraction_standard",
"product_sales",
"travel_guide",
"blog_post",
"summary",
"structured_data",
"marketing_copy",
"faq"
],
"format_descriptions": {
"attraction_standard": "景点标准化格式",
"product_sales": "产品销售格式",
"travel_guide": "旅游攻略格式",
"blog_post": "博客文章格式",
"summary": "摘要格式",
"structured_data": "结构化数据格式",
"marketing_copy": "营销文案格式",
"faq": "常见问题格式"
}
}
return ApiResponse(
success=True,
message="获取支持格式成功",
data=supported_data
)
except Exception as e:
error_msg = f"获取支持格式失败: {str(e)}"
logger.error(error_msg, exc_info=True)
return JSONResponse(
status_code=500,
content=ApiResponse(
success=False,
message="获取支持格式失败",
error=error_msg
).dict()
)
@router.get("/pipeline/stats", response_model=ApiResponse, summary="获取流水线统计")
async def get_pipeline_stats(
pipeline: Dict[str, Any] = Depends(__import__('api_v2.main', fromlist=['get_document_pipeline']).get_document_pipeline)
):
"""获取文档处理流水线的统计信息"""
try:
stats = {
"text_extractor": pipeline["text_extractor"].get_extractor_stats(),
"content_integrator": pipeline["content_integrator"].get_integrator_stats(),
"content_transformer": pipeline["content_transformer"].get_transformer_stats(),
"config": pipeline["config"].document_processing.dict()
}
return ApiResponse(
success=True,
message="统计信息获取成功",
data=stats
)
except Exception as e:
error_msg = f"获取统计信息失败: {str(e)}"
logger.error(error_msg, exc_info=True)
return JSONResponse(
status_code=500,
content=ApiResponse(
success=False,
message="获取统计信息失败",
error=error_msg
).dict()
)