fix the document problem

This commit is contained in:
jinye_huang 2025-07-29 19:50:47 +08:00
parent 740abd06a1
commit 87a2514bb3
8 changed files with 144 additions and 45 deletions

Binary file not shown.

View File

@ -49,15 +49,40 @@ async def integrate_content(request: ContentIntegrationRequest) -> ContentIntegr
try: try:
# 创建临时文件处理base64文档 # 创建临时文件处理base64文档
if request.documents: if request.documents:
temp_files = []
for doc in request.documents: for doc in request.documents:
try: try:
# 从base64内容中提取实际内容跳过data:image/jpeg;base64,这样的前缀)
content = doc.content
if ',' in content:
content = content.split(',', 1)[1]
# 创建临时文件 # 创建临时文件
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(doc.filename)[1]) as temp_file: suffix = os.path.splitext(doc.filename)[1]
if not suffix:
# 根据MIME类型推断后缀
mime_to_ext = {
'text/plain': '.txt',
'application/pdf': '.pdf',
'application/msword': '.doc',
'application/vnd.openxmlformats-officedocument.wordprocessingml.document': '.docx',
'image/jpeg': '.jpg',
'image/png': '.png'
}
suffix = mime_to_ext.get(doc.mime_type, '.bin')
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp_file:
# 解码base64内容并写入临时文件 # 解码base64内容并写入临时文件
content = base64.b64decode(doc.content) try:
temp_file.write(content) decoded_content = base64.b64decode(content)
temp_files.append(temp_file.name) temp_file.write(decoded_content)
temp_files.append(temp_file.name)
logger.info(f"成功保存临时文件: {temp_file.name}")
except Exception as e:
logger.error(f"Base64解码失败: {e}")
raise HTTPException(
status_code=400,
detail=f"文档 {doc.filename} 的Base64内容无效: {str(e)}"
)
except Exception as e: except Exception as e:
logger.error(f"处理文档 {doc.filename} 失败: {e}") logger.error(f"处理文档 {doc.filename} 失败: {e}")
raise HTTPException( raise HTTPException(
@ -70,8 +95,8 @@ async def integrate_content(request: ContentIntegrationRequest) -> ContentIntegr
# 调用服务层处理 # 调用服务层处理
result = await integration_service.integrate_content( result = await integration_service.integrate_content(
document_paths=temp_files, document_paths=temp_files,
keywords=request.keywords, keywords=request.keywords or [],
cookies=request.cookies, cookies=request.cookies or "",
sort_type=request.sort_type, sort_type=request.sort_type,
note_type=request.note_type, note_type=request.note_type,
note_time=request.note_time, note_time=request.note_time,
@ -80,36 +105,7 @@ async def integrate_content(request: ContentIntegrationRequest) -> ContentIntegr
query_num=request.query_num query_num=request.query_num
) )
# 转换为响应模型 return result
if result["success"]:
response = ContentIntegrationResponse(
success=True,
timestamp=result["timestamp"],
processing_time=result["processing_time"],
input_summary=result["input_summary"],
document_info=result["document_info"],
xhs_info=result["xhs_info"],
integrated_content=result["integrated_content"],
search_config=result["search_config"],
error_message=None # 成功时无错误信息
)
logger.info(f"内容整合成功,处理时间:{result['processing_time']}")
else:
from datetime import datetime
response = ContentIntegrationResponse(
success=False,
timestamp=result.get("timestamp", datetime.now().strftime("%Y%m%d_%H%M%S")),
processing_time=result.get("processing_time", "0秒"),
input_summary=result.get("input_summary"),
document_info=result.get("document_info"),
xhs_info=result.get("xhs_info"),
integrated_content=result.get("integrated_content"),
search_config=result.get("search_config"),
error_message=result.get("error_message")
)
logger.error(f"内容整合失败:{result['error_message']}")
return response
except Exception as e: except Exception as e:
logger.error(f"内容整合接口异常:{e}", exc_info=True) logger.error(f"内容整合接口异常:{e}", exc_info=True)

View File

@ -78,7 +78,8 @@ class ContentIntegrationService:
整合结果字典 整合结果字典
""" """
start_time = time.time() start_time = time.time()
logger.info(f"开始整合任务:文档数量 {len(document_paths)}, 关键词数量 {len(keywords)}")
logger.info(f"开始整合任务:文档数量 {len(document_paths)}, 关键词数量 {len(keywords) if keywords else 0}")
try: try:
# 确保输出目录存在 # 确保输出目录存在

View File

@ -12,12 +12,46 @@ import traceback
from typing import Dict, Any, Optional, List, Tuple from typing import Dict, Any, Optional, List, Tuple
import mysql.connector import mysql.connector
from mysql.connector import pooling from mysql.connector import pooling
from functools import wraps
from core.config import ConfigManager from core.config import ConfigManager
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def database_retry(max_retries: int = 3, delay: float = 1.0):
"""数据库查询重试装饰器"""
def decorator(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
if not self.db_pool:
logger.error("数据库连接池未初始化,尝试重新初始化...")
self.db_pool = self._init_db_pool()
if not self.db_pool:
logger.error("数据库连接池重新初始化失败,返回兜底数据")
return self._get_fallback_data(func.__name__)
last_exception = None
current_delay = delay
for attempt in range(max_retries):
try:
return func(self, *args, **kwargs)
except Exception as e:
last_exception = e
logger.warning(f"数据库查询 {func.__name__}{attempt + 1} 次失败: {e}")
if attempt < max_retries - 1:
time.sleep(current_delay)
current_delay *= 2
# 所有重试都失败,返回兜底数据
logger.error(f"数据库查询 {func.__name__}{max_retries} 次重试后仍然失败: {last_exception}")
return self._get_fallback_data(func.__name__)
return wrapper
return decorator
class DatabaseService: class DatabaseService:
"""数据库服务类""" """数据库服务类"""
@ -29,7 +63,46 @@ class DatabaseService:
config_manager: 配置管理器 config_manager: 配置管理器
""" """
self.config_manager = config_manager self.config_manager = config_manager
# 从配置获取数据库相关设置
db_config = config_manager.get_raw_config('database')
self.pool_size = db_config.get('pool_size', 10)
self.max_retry_attempts = db_config.get('max_retry_attempts', 3)
self.query_timeout = db_config.get('query_timeout', 30)
self.soft_delete_field = db_config.get('soft_delete_field', 'isDelete')
self.active_record_value = db_config.get('active_record_value', 0)
self.db_pool = self._init_db_pool() self.db_pool = self._init_db_pool()
# 兜底数据缓存
self._fallback_cache = {
'styles': [],
'audiences': [],
'scenic_spots': [],
'products': [],
'materials': []
}
def _get_fallback_data(self, func_name: str) -> Any:
"""获取兜底数据"""
fallback_mapping = {
'get_all_styles': self._fallback_cache['styles'],
'get_all_audiences': self._fallback_cache['audiences'],
'get_scenic_spot_by_id': None,
'get_product_by_id': None,
'get_style_by_id': None,
'get_audience_by_id': None,
'get_scenic_spots_by_ids': [],
'get_products_by_ids': [],
'get_styles_by_ids': [],
'get_audiences_by_ids': [],
'get_content_by_id': None,
'get_content_by_topic_index': None,
}
result = fallback_mapping.get(func_name, None)
logger.info(f"使用兜底数据 for {func_name}: {type(result)}")
return result
def _init_db_pool(self): def _init_db_pool(self):
"""初始化数据库连接池""" """初始化数据库连接池"""
@ -57,7 +130,7 @@ class DatabaseService:
# 创建连接池 # 创建连接池
pool = pooling.MySQLConnectionPool( pool = pooling.MySQLConnectionPool(
pool_name=f"database_service_pool_{int(time.time())}", pool_name=f"database_service_pool_{int(time.time())}",
pool_size=10, pool_size=self.pool_size,
**attempt["config"] **attempt["config"]
) )
@ -90,18 +163,16 @@ class DatabaseService:
return processed_config return processed_config
@database_retry(max_retries=3, delay=1.0)
def get_scenic_spot_by_id(self, spot_id: int) -> Optional[Dict[str, Any]]: def get_scenic_spot_by_id(self, spot_id: int) -> Optional[Dict[str, Any]]:
"""根据ID获取单个景区信息""" """根据ID获取单个景区信息"""
if not self.db_pool:
logger.error("数据库连接池未初始化")
return None
try: try:
with self.db_pool.get_connection() as conn: with self.db_pool.get_connection() as conn:
with conn.cursor(dictionary=True) as cursor: with conn.cursor(dictionary=True) as cursor:
cursor.execute( cursor.execute(
"SELECT * FROM scenicSpot WHERE id = %s AND isDelete = 0", f"SELECT * FROM scenicSpot WHERE id = %s AND {self.soft_delete_field} = %s",
(spot_id,) (spot_id, self.active_record_value)
) )
result = cursor.fetchone() result = cursor.fetchone()
if result: if result:
@ -114,6 +185,7 @@ class DatabaseService:
logger.error(f"查询景区信息失败: {e}") logger.error(f"查询景区信息失败: {e}")
return None return None
@database_retry(max_retries=3, delay=1.0)
def get_product_by_id(self, product_id: int) -> Optional[Dict[str, Any]]: def get_product_by_id(self, product_id: int) -> Optional[Dict[str, Any]]:
"""根据ID获取单个产品信息""" """根据ID获取单个产品信息"""
if not self.db_pool: if not self.db_pool:
@ -133,6 +205,7 @@ class DatabaseService:
logger.error(f"查询产品信息失败: {e}") logger.error(f"查询产品信息失败: {e}")
return None return None
@database_retry(max_retries=3, delay=1.0)
def get_style_by_id(self, style_id: int) -> Optional[Dict[str, Any]]: def get_style_by_id(self, style_id: int) -> Optional[Dict[str, Any]]:
""" """
根据ID获取风格信息 根据ID获取风格信息
@ -166,6 +239,7 @@ class DatabaseService:
logger.error(f"查询风格信息失败: {e}") logger.error(f"查询风格信息失败: {e}")
return None return None
@database_retry(max_retries=3, delay=1.0)
def get_audience_by_id(self, audience_id: int) -> Optional[Dict[str, Any]]: def get_audience_by_id(self, audience_id: int) -> Optional[Dict[str, Any]]:
""" """
根据ID获取受众信息 根据ID获取受众信息
@ -199,6 +273,7 @@ class DatabaseService:
logger.error(f"查询受众信息失败: {e}") logger.error(f"查询受众信息失败: {e}")
return None return None
@database_retry(max_retries=3, delay=1.0)
def get_scenic_spots_by_ids(self, spot_ids: List[int]) -> List[Dict[str, Any]]: def get_scenic_spots_by_ids(self, spot_ids: List[int]) -> List[Dict[str, Any]]:
""" """
根据ID列表批量获取景区信息 根据ID列表批量获取景区信息
@ -227,6 +302,7 @@ class DatabaseService:
logger.error(f"批量查询景区信息失败: {e}") logger.error(f"批量查询景区信息失败: {e}")
return [] return []
@database_retry(max_retries=3, delay=1.0)
def get_products_by_ids(self, productIds: List[int]) -> List[Dict[str, Any]]: def get_products_by_ids(self, productIds: List[int]) -> List[Dict[str, Any]]:
""" """
根据ID列表批量获取产品信息 根据ID列表批量获取产品信息
@ -255,6 +331,7 @@ class DatabaseService:
logger.error(f"批量查询产品信息失败: {e}") logger.error(f"批量查询产品信息失败: {e}")
return [] return []
@database_retry(max_retries=3, delay=1.0)
def get_styles_by_ids(self, styleIds: List[int]) -> List[Dict[str, Any]]: def get_styles_by_ids(self, styleIds: List[int]) -> List[Dict[str, Any]]:
""" """
根据ID列表批量获取风格信息 根据ID列表批量获取风格信息
@ -283,6 +360,7 @@ class DatabaseService:
logger.error(f"批量查询风格信息失败: {e}") logger.error(f"批量查询风格信息失败: {e}")
return [] return []
@database_retry(max_retries=3, delay=1.0)
def get_audiences_by_ids(self, audienceIds: List[int]) -> List[Dict[str, Any]]: def get_audiences_by_ids(self, audienceIds: List[int]) -> List[Dict[str, Any]]:
""" """
根据ID列表批量获取受众信息 根据ID列表批量获取受众信息
@ -311,6 +389,7 @@ class DatabaseService:
logger.error(f"批量查询受众信息失败: {e}") logger.error(f"批量查询受众信息失败: {e}")
return [] return []
@database_retry(max_retries=3, delay=1.0)
def list_all_scenic_spots(self, user_id: Optional[int] = None, is_public: Optional[bool] = None) -> List[Dict[str, Any]]: def list_all_scenic_spots(self, user_id: Optional[int] = None, is_public: Optional[bool] = None) -> List[Dict[str, Any]]:
""" """
获取所有景区列表 获取所有景区列表
@ -356,6 +435,7 @@ class DatabaseService:
logger.error(f"获取景区列表失败: {e}") logger.error(f"获取景区列表失败: {e}")
return [] return []
@database_retry(max_retries=3, delay=1.0)
def list_all_products(self, user_id: Optional[int] = None, is_public: Optional[bool] = None) -> List[Dict[str, Any]]: def list_all_products(self, user_id: Optional[int] = None, is_public: Optional[bool] = None) -> List[Dict[str, Any]]:
""" """
获取所有产品列表 获取所有产品列表
@ -403,6 +483,7 @@ class DatabaseService:
logger.error(f"获取产品列表失败: {e}") logger.error(f"获取产品列表失败: {e}")
return [] return []
@database_retry(max_retries=3, delay=1.0)
def list_all_styles(self) -> List[Dict[str, Any]]: def list_all_styles(self) -> List[Dict[str, Any]]:
""" """
获取所有风格列表 获取所有风格列表
@ -425,6 +506,7 @@ class DatabaseService:
logger.error(f"获取风格列表失败: {e}") logger.error(f"获取风格列表失败: {e}")
return [] return []
@database_retry(max_retries=3, delay=1.0)
def list_all_audiences(self) -> List[Dict[str, Any]]: def list_all_audiences(self) -> List[Dict[str, Any]]:
""" """
获取所有受众列表 获取所有受众列表
@ -458,6 +540,7 @@ class DatabaseService:
# 名称到ID的反向查询方法 # 名称到ID的反向查询方法
@database_retry(max_retries=3, delay=1.0)
def get_style_id_by_name(self, style_name: str) -> Optional[int]: def get_style_id_by_name(self, style_name: str) -> Optional[int]:
""" """
根据风格名称获取风格ID 根据风格名称获取风格ID
@ -490,6 +573,7 @@ class DatabaseService:
logger.error(f"查询风格ID失败: {e}") logger.error(f"查询风格ID失败: {e}")
return None return None
@database_retry(max_retries=3, delay=1.0)
def get_audience_id_by_name(self, audience_name: str) -> Optional[int]: def get_audience_id_by_name(self, audience_name: str) -> Optional[int]:
""" """
根据受众名称获取受众ID 根据受众名称获取受众ID
@ -522,6 +606,7 @@ class DatabaseService:
logger.error(f"查询受众ID失败: {e}") logger.error(f"查询受众ID失败: {e}")
return None return None
@database_retry(max_retries=3, delay=1.0)
def get_scenic_spot_id_by_name(self, spot_name: str) -> Optional[int]: def get_scenic_spot_id_by_name(self, spot_name: str) -> Optional[int]:
""" """
根据景区名称获取景区ID 根据景区名称获取景区ID
@ -554,6 +639,7 @@ class DatabaseService:
logger.error(f"查询景区ID失败: {e}") logger.error(f"查询景区ID失败: {e}")
return None return None
@database_retry(max_retries=3, delay=1.0)
def get_product_id_by_name(self, product_name: str) -> Optional[int]: def get_product_id_by_name(self, product_name: str) -> Optional[int]:
""" """
根据产品名称获取产品ID 根据产品名称获取产品ID
@ -588,6 +674,7 @@ class DatabaseService:
@database_retry(max_retries=3, delay=1.0)
def get_image_by_id(self, image_id: int) -> Optional[Dict[str, Any]]: def get_image_by_id(self, image_id: int) -> Optional[Dict[str, Any]]:
""" """
根据ID获取图像信息 根据ID获取图像信息
@ -621,6 +708,7 @@ class DatabaseService:
logger.error(f"查询图像信息失败: {e}") logger.error(f"查询图像信息失败: {e}")
return None return None
@database_retry(max_retries=3, delay=1.0)
def get_images_by_ids(self, image_ids: List[int]) -> List[Dict[str, Any]]: def get_images_by_ids(self, image_ids: List[int]) -> List[Dict[str, Any]]:
""" """
根据ID列表批量获取图像信息 根据ID列表批量获取图像信息
@ -649,6 +737,7 @@ class DatabaseService:
logger.error(f"批量查询图像信息失败: {e}") logger.error(f"批量查询图像信息失败: {e}")
return [] return []
@database_retry(max_retries=3, delay=1.0)
def get_content_by_id(self, content_id: int) -> Optional[Dict[str, Any]]: def get_content_by_id(self, content_id: int) -> Optional[Dict[str, Any]]:
""" """
根据ID获取内容信息 根据ID获取内容信息
@ -682,6 +771,7 @@ class DatabaseService:
logger.error(f"查询内容信息失败: {e}") logger.error(f"查询内容信息失败: {e}")
return None return None
@database_retry(max_retries=3, delay=1.0)
def get_content_by_topic_index(self, topic_index: str) -> Optional[Dict[str, Any]]: def get_content_by_topic_index(self, topic_index: str) -> Optional[Dict[str, Any]]:
"""根据主题索引获取内容信息""" """根据主题索引获取内容信息"""
if not self.db_pool: if not self.db_pool:
@ -704,6 +794,7 @@ class DatabaseService:
logger.error(f"查询内容信息失败: {e}") logger.error(f"查询内容信息失败: {e}")
return None return None
@database_retry(max_retries=3, delay=1.0)
def get_images_by_folder_id(self, folder_id: int) -> List[Dict[str, Any]]: def get_images_by_folder_id(self, folder_id: int) -> List[Dict[str, Any]]:
""" """
根据文件夹ID获取图像列表 根据文件夹ID获取图像列表
@ -732,6 +823,7 @@ class DatabaseService:
logger.error(f"根据文件夹ID获取图像失败: {e}") logger.error(f"根据文件夹ID获取图像失败: {e}")
return [] return []
@database_retry(max_retries=3, delay=1.0)
def get_folder_by_id(self, folder_id: int) -> Optional[Dict[str, Any]]: def get_folder_by_id(self, folder_id: int) -> Optional[Dict[str, Any]]:
""" """
根据ID获取文件夹信息 根据ID获取文件夹信息
@ -765,6 +857,7 @@ class DatabaseService:
logger.error(f"查询文件夹信息失败: {e}") logger.error(f"查询文件夹信息失败: {e}")
return None return None
@database_retry(max_retries=3, delay=1.0)
def get_related_images_for_content(self, content_id: int, limit: int = 10) -> List[Dict[str, Any]]: def get_related_images_for_content(self, content_id: int, limit: int = 10) -> List[Dict[str, Any]]:
""" """
获取与内容相关的图像列表 获取与内容相关的图像列表
@ -807,6 +900,7 @@ class DatabaseService:
# 模板相关查询方法 # 模板相关查询方法
@database_retry(max_retries=3, delay=1.0)
def get_all_poster_templates(self) -> List[Dict[str, Any]]: def get_all_poster_templates(self) -> List[Dict[str, Any]]:
""" """
获取所有海报模板 获取所有海报模板
@ -831,6 +925,7 @@ class DatabaseService:
logger.error(f"获取海报模板列表失败: {e}") logger.error(f"获取海报模板列表失败: {e}")
return [] return []
@database_retry(max_retries=3, delay=1.0)
def get_poster_template_by_id(self, template_id: str) -> Optional[Dict[str, Any]]: def get_poster_template_by_id(self, template_id: str) -> Optional[Dict[str, Any]]:
""" """
根据ID获取海报模板信息 根据ID获取海报模板信息
@ -864,6 +959,7 @@ class DatabaseService:
logger.error(f"查询模板信息失败: {e}") logger.error(f"查询模板信息失败: {e}")
return None return None
@database_retry(max_retries=3, delay=1.0)
def get_active_poster_templates(self) -> List[Dict[str, Any]]: def get_active_poster_templates(self) -> List[Dict[str, Any]]:
""" """
获取所有激活的海报模板 获取所有激活的海报模板
@ -935,6 +1031,7 @@ class DatabaseService:
except Exception as e: except Exception as e:
logger.error(f"更新模板使用统计失败: {e}") logger.error(f"更新模板使用统计失败: {e}")
@database_retry(max_retries=3, delay=1.0)
def get_template_usage_stats(self, template_id: str) -> Optional[Dict[str, Any]]: def get_template_usage_stats(self, template_id: str) -> Optional[Dict[str, Any]]:
""" """
获取模板使用统计 获取模板使用统计

View File

@ -4,5 +4,10 @@
"password": "Kj#9mP2$", "password": "Kj#9mP2$",
"database": "travel_content", "database": "travel_content",
"port": 3306, "port": 3306,
"charset": "utf8mb4" "charset": "utf8mb4",
"pool_size": 10,
"max_retry_attempts": 3,
"query_timeout": 30,
"soft_delete_field": "isDelete",
"active_record_value": 0
} }