diff --git a/api/__pycache__/main.cpython-312.pyc b/api/__pycache__/main.cpython-312.pyc index 834e7c3..a0d2a64 100644 Binary files a/api/__pycache__/main.cpython-312.pyc and b/api/__pycache__/main.cpython-312.pyc differ diff --git a/api/routers/__pycache__/content_integration.cpython-312.pyc b/api/routers/__pycache__/content_integration.cpython-312.pyc index a923656..d3bd16e 100644 Binary files a/api/routers/__pycache__/content_integration.cpython-312.pyc and b/api/routers/__pycache__/content_integration.cpython-312.pyc differ diff --git a/api/routers/content_integration.py b/api/routers/content_integration.py index f384433..0cd3bad 100644 --- a/api/routers/content_integration.py +++ b/api/routers/content_integration.py @@ -49,15 +49,40 @@ async def integrate_content(request: ContentIntegrationRequest) -> ContentIntegr try: # 创建临时文件处理base64文档 if request.documents: - temp_files = [] for doc in request.documents: try: + # 从base64内容中提取实际内容(跳过data:image/jpeg;base64,这样的前缀) + content = doc.content + if ',' in content: + content = content.split(',', 1)[1] + # 创建临时文件 - with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(doc.filename)[1]) as temp_file: + suffix = os.path.splitext(doc.filename)[1] + if not suffix: + # 根据MIME类型推断后缀 + mime_to_ext = { + 'text/plain': '.txt', + 'application/pdf': '.pdf', + 'application/msword': '.doc', + 'application/vnd.openxmlformats-officedocument.wordprocessingml.document': '.docx', + 'image/jpeg': '.jpg', + 'image/png': '.png' + } + suffix = mime_to_ext.get(doc.mime_type, '.bin') + + with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp_file: # 解码base64内容并写入临时文件 - content = base64.b64decode(doc.content) - temp_file.write(content) - temp_files.append(temp_file.name) + try: + decoded_content = base64.b64decode(content) + temp_file.write(decoded_content) + temp_files.append(temp_file.name) + logger.info(f"成功保存临时文件: {temp_file.name}") + except Exception as e: + logger.error(f"Base64解码失败: {e}") + raise HTTPException( + status_code=400, + detail=f"文档 {doc.filename} 的Base64内容无效: {str(e)}" + ) except Exception as e: logger.error(f"处理文档 {doc.filename} 失败: {e}") raise HTTPException( @@ -70,8 +95,8 @@ async def integrate_content(request: ContentIntegrationRequest) -> ContentIntegr # 调用服务层处理 result = await integration_service.integrate_content( document_paths=temp_files, - keywords=request.keywords, - cookies=request.cookies, + keywords=request.keywords or [], + cookies=request.cookies or "", sort_type=request.sort_type, note_type=request.note_type, note_time=request.note_time, @@ -80,36 +105,7 @@ async def integrate_content(request: ContentIntegrationRequest) -> ContentIntegr query_num=request.query_num ) - # 转换为响应模型 - if result["success"]: - response = ContentIntegrationResponse( - success=True, - timestamp=result["timestamp"], - processing_time=result["processing_time"], - input_summary=result["input_summary"], - document_info=result["document_info"], - xhs_info=result["xhs_info"], - integrated_content=result["integrated_content"], - search_config=result["search_config"], - error_message=None # 成功时无错误信息 - ) - logger.info(f"内容整合成功,处理时间:{result['processing_time']}") - else: - from datetime import datetime - response = ContentIntegrationResponse( - success=False, - timestamp=result.get("timestamp", datetime.now().strftime("%Y%m%d_%H%M%S")), - processing_time=result.get("processing_time", "0秒"), - input_summary=result.get("input_summary"), - document_info=result.get("document_info"), - xhs_info=result.get("xhs_info"), - integrated_content=result.get("integrated_content"), - search_config=result.get("search_config"), - error_message=result.get("error_message") - ) - logger.error(f"内容整合失败:{result['error_message']}") - - return response + return result except Exception as e: logger.error(f"内容整合接口异常:{e}", exc_info=True) diff --git a/api/services/__pycache__/content_integration_service.cpython-312.pyc b/api/services/__pycache__/content_integration_service.cpython-312.pyc index 8af7c34..4766af2 100644 Binary files a/api/services/__pycache__/content_integration_service.cpython-312.pyc and b/api/services/__pycache__/content_integration_service.cpython-312.pyc differ diff --git a/api/services/__pycache__/database_service.cpython-312.pyc b/api/services/__pycache__/database_service.cpython-312.pyc index fa799f5..38d98e4 100644 Binary files a/api/services/__pycache__/database_service.cpython-312.pyc and b/api/services/__pycache__/database_service.cpython-312.pyc differ diff --git a/api/services/content_integration_service.py b/api/services/content_integration_service.py index 043eaae..672145f 100644 --- a/api/services/content_integration_service.py +++ b/api/services/content_integration_service.py @@ -78,7 +78,8 @@ class ContentIntegrationService: 整合结果字典 """ start_time = time.time() - logger.info(f"开始整合任务:文档数量 {len(document_paths)}, 关键词数量 {len(keywords)}") + + logger.info(f"开始整合任务:文档数量 {len(document_paths)}, 关键词数量 {len(keywords) if keywords else 0}") try: # 确保输出目录存在 diff --git a/api/services/database_service.py b/api/services/database_service.py index 8d3a855..641fc3b 100644 --- a/api/services/database_service.py +++ b/api/services/database_service.py @@ -12,12 +12,46 @@ import traceback from typing import Dict, Any, Optional, List, Tuple import mysql.connector from mysql.connector import pooling +from functools import wraps from core.config import ConfigManager logger = logging.getLogger(__name__) +def database_retry(max_retries: int = 3, delay: float = 1.0): + """数据库查询重试装饰器""" + def decorator(func): + @wraps(func) + def wrapper(self, *args, **kwargs): + if not self.db_pool: + logger.error("数据库连接池未初始化,尝试重新初始化...") + self.db_pool = self._init_db_pool() + if not self.db_pool: + logger.error("数据库连接池重新初始化失败,返回兜底数据") + return self._get_fallback_data(func.__name__) + + last_exception = None + current_delay = delay + + for attempt in range(max_retries): + try: + return func(self, *args, **kwargs) + except Exception as e: + last_exception = e + logger.warning(f"数据库查询 {func.__name__} 第 {attempt + 1} 次失败: {e}") + if attempt < max_retries - 1: + time.sleep(current_delay) + current_delay *= 2 + + # 所有重试都失败,返回兜底数据 + logger.error(f"数据库查询 {func.__name__} 在 {max_retries} 次重试后仍然失败: {last_exception}") + return self._get_fallback_data(func.__name__) + + return wrapper + return decorator + + class DatabaseService: """数据库服务类""" @@ -29,7 +63,46 @@ class DatabaseService: config_manager: 配置管理器 """ self.config_manager = config_manager + + # 从配置获取数据库相关设置 + db_config = config_manager.get_raw_config('database') + self.pool_size = db_config.get('pool_size', 10) + self.max_retry_attempts = db_config.get('max_retry_attempts', 3) + self.query_timeout = db_config.get('query_timeout', 30) + self.soft_delete_field = db_config.get('soft_delete_field', 'isDelete') + self.active_record_value = db_config.get('active_record_value', 0) + self.db_pool = self._init_db_pool() + + # 兜底数据缓存 + self._fallback_cache = { + 'styles': [], + 'audiences': [], + 'scenic_spots': [], + 'products': [], + 'materials': [] + } + + def _get_fallback_data(self, func_name: str) -> Any: + """获取兜底数据""" + fallback_mapping = { + 'get_all_styles': self._fallback_cache['styles'], + 'get_all_audiences': self._fallback_cache['audiences'], + 'get_scenic_spot_by_id': None, + 'get_product_by_id': None, + 'get_style_by_id': None, + 'get_audience_by_id': None, + 'get_scenic_spots_by_ids': [], + 'get_products_by_ids': [], + 'get_styles_by_ids': [], + 'get_audiences_by_ids': [], + 'get_content_by_id': None, + 'get_content_by_topic_index': None, + } + + result = fallback_mapping.get(func_name, None) + logger.info(f"使用兜底数据 for {func_name}: {type(result)}") + return result def _init_db_pool(self): """初始化数据库连接池""" @@ -57,7 +130,7 @@ class DatabaseService: # 创建连接池 pool = pooling.MySQLConnectionPool( pool_name=f"database_service_pool_{int(time.time())}", - pool_size=10, + pool_size=self.pool_size, **attempt["config"] ) @@ -90,18 +163,16 @@ class DatabaseService: return processed_config + @database_retry(max_retries=3, delay=1.0) def get_scenic_spot_by_id(self, spot_id: int) -> Optional[Dict[str, Any]]: """根据ID获取单个景区信息""" - if not self.db_pool: - logger.error("数据库连接池未初始化") - return None try: with self.db_pool.get_connection() as conn: with conn.cursor(dictionary=True) as cursor: cursor.execute( - "SELECT * FROM scenicSpot WHERE id = %s AND isDelete = 0", - (spot_id,) + f"SELECT * FROM scenicSpot WHERE id = %s AND {self.soft_delete_field} = %s", + (spot_id, self.active_record_value) ) result = cursor.fetchone() if result: @@ -114,6 +185,7 @@ class DatabaseService: logger.error(f"查询景区信息失败: {e}") return None + @database_retry(max_retries=3, delay=1.0) def get_product_by_id(self, product_id: int) -> Optional[Dict[str, Any]]: """根据ID获取单个产品信息""" if not self.db_pool: @@ -133,6 +205,7 @@ class DatabaseService: logger.error(f"查询产品信息失败: {e}") return None + @database_retry(max_retries=3, delay=1.0) def get_style_by_id(self, style_id: int) -> Optional[Dict[str, Any]]: """ 根据ID获取风格信息 @@ -166,6 +239,7 @@ class DatabaseService: logger.error(f"查询风格信息失败: {e}") return None + @database_retry(max_retries=3, delay=1.0) def get_audience_by_id(self, audience_id: int) -> Optional[Dict[str, Any]]: """ 根据ID获取受众信息 @@ -199,6 +273,7 @@ class DatabaseService: logger.error(f"查询受众信息失败: {e}") return None + @database_retry(max_retries=3, delay=1.0) def get_scenic_spots_by_ids(self, spot_ids: List[int]) -> List[Dict[str, Any]]: """ 根据ID列表批量获取景区信息 @@ -227,6 +302,7 @@ class DatabaseService: logger.error(f"批量查询景区信息失败: {e}") return [] + @database_retry(max_retries=3, delay=1.0) def get_products_by_ids(self, productIds: List[int]) -> List[Dict[str, Any]]: """ 根据ID列表批量获取产品信息 @@ -255,6 +331,7 @@ class DatabaseService: logger.error(f"批量查询产品信息失败: {e}") return [] + @database_retry(max_retries=3, delay=1.0) def get_styles_by_ids(self, styleIds: List[int]) -> List[Dict[str, Any]]: """ 根据ID列表批量获取风格信息 @@ -283,6 +360,7 @@ class DatabaseService: logger.error(f"批量查询风格信息失败: {e}") return [] + @database_retry(max_retries=3, delay=1.0) def get_audiences_by_ids(self, audienceIds: List[int]) -> List[Dict[str, Any]]: """ 根据ID列表批量获取受众信息 @@ -311,6 +389,7 @@ class DatabaseService: logger.error(f"批量查询受众信息失败: {e}") return [] + @database_retry(max_retries=3, delay=1.0) def list_all_scenic_spots(self, user_id: Optional[int] = None, is_public: Optional[bool] = None) -> List[Dict[str, Any]]: """ 获取所有景区列表 @@ -356,6 +435,7 @@ class DatabaseService: logger.error(f"获取景区列表失败: {e}") return [] + @database_retry(max_retries=3, delay=1.0) def list_all_products(self, user_id: Optional[int] = None, is_public: Optional[bool] = None) -> List[Dict[str, Any]]: """ 获取所有产品列表 @@ -403,6 +483,7 @@ class DatabaseService: logger.error(f"获取产品列表失败: {e}") return [] + @database_retry(max_retries=3, delay=1.0) def list_all_styles(self) -> List[Dict[str, Any]]: """ 获取所有风格列表 @@ -425,6 +506,7 @@ class DatabaseService: logger.error(f"获取风格列表失败: {e}") return [] + @database_retry(max_retries=3, delay=1.0) def list_all_audiences(self) -> List[Dict[str, Any]]: """ 获取所有受众列表 @@ -458,6 +540,7 @@ class DatabaseService: # 名称到ID的反向查询方法 + @database_retry(max_retries=3, delay=1.0) def get_style_id_by_name(self, style_name: str) -> Optional[int]: """ 根据风格名称获取风格ID @@ -490,6 +573,7 @@ class DatabaseService: logger.error(f"查询风格ID失败: {e}") return None + @database_retry(max_retries=3, delay=1.0) def get_audience_id_by_name(self, audience_name: str) -> Optional[int]: """ 根据受众名称获取受众ID @@ -522,6 +606,7 @@ class DatabaseService: logger.error(f"查询受众ID失败: {e}") return None + @database_retry(max_retries=3, delay=1.0) def get_scenic_spot_id_by_name(self, spot_name: str) -> Optional[int]: """ 根据景区名称获取景区ID @@ -554,6 +639,7 @@ class DatabaseService: logger.error(f"查询景区ID失败: {e}") return None + @database_retry(max_retries=3, delay=1.0) def get_product_id_by_name(self, product_name: str) -> Optional[int]: """ 根据产品名称获取产品ID @@ -588,6 +674,7 @@ class DatabaseService: + @database_retry(max_retries=3, delay=1.0) def get_image_by_id(self, image_id: int) -> Optional[Dict[str, Any]]: """ 根据ID获取图像信息 @@ -621,6 +708,7 @@ class DatabaseService: logger.error(f"查询图像信息失败: {e}") return None + @database_retry(max_retries=3, delay=1.0) def get_images_by_ids(self, image_ids: List[int]) -> List[Dict[str, Any]]: """ 根据ID列表批量获取图像信息 @@ -649,6 +737,7 @@ class DatabaseService: logger.error(f"批量查询图像信息失败: {e}") return [] + @database_retry(max_retries=3, delay=1.0) def get_content_by_id(self, content_id: int) -> Optional[Dict[str, Any]]: """ 根据ID获取内容信息 @@ -682,6 +771,7 @@ class DatabaseService: logger.error(f"查询内容信息失败: {e}") return None + @database_retry(max_retries=3, delay=1.0) def get_content_by_topic_index(self, topic_index: str) -> Optional[Dict[str, Any]]: """根据主题索引获取内容信息""" if not self.db_pool: @@ -704,6 +794,7 @@ class DatabaseService: logger.error(f"查询内容信息失败: {e}") return None + @database_retry(max_retries=3, delay=1.0) def get_images_by_folder_id(self, folder_id: int) -> List[Dict[str, Any]]: """ 根据文件夹ID获取图像列表 @@ -732,6 +823,7 @@ class DatabaseService: logger.error(f"根据文件夹ID获取图像失败: {e}") return [] + @database_retry(max_retries=3, delay=1.0) def get_folder_by_id(self, folder_id: int) -> Optional[Dict[str, Any]]: """ 根据ID获取文件夹信息 @@ -765,6 +857,7 @@ class DatabaseService: logger.error(f"查询文件夹信息失败: {e}") return None + @database_retry(max_retries=3, delay=1.0) def get_related_images_for_content(self, content_id: int, limit: int = 10) -> List[Dict[str, Any]]: """ 获取与内容相关的图像列表 @@ -807,6 +900,7 @@ class DatabaseService: # 模板相关查询方法 + @database_retry(max_retries=3, delay=1.0) def get_all_poster_templates(self) -> List[Dict[str, Any]]: """ 获取所有海报模板 @@ -831,6 +925,7 @@ class DatabaseService: logger.error(f"获取海报模板列表失败: {e}") return [] + @database_retry(max_retries=3, delay=1.0) def get_poster_template_by_id(self, template_id: str) -> Optional[Dict[str, Any]]: """ 根据ID获取海报模板信息 @@ -864,6 +959,7 @@ class DatabaseService: logger.error(f"查询模板信息失败: {e}") return None + @database_retry(max_retries=3, delay=1.0) def get_active_poster_templates(self) -> List[Dict[str, Any]]: """ 获取所有激活的海报模板 @@ -935,6 +1031,7 @@ class DatabaseService: except Exception as e: logger.error(f"更新模板使用统计失败: {e}") + @database_retry(max_retries=3, delay=1.0) def get_template_usage_stats(self, template_id: str) -> Optional[Dict[str, Any]]: """ 获取模板使用统计 diff --git a/config/database.json b/config/database.json index cbc6c4b..4ec8470 100644 --- a/config/database.json +++ b/config/database.json @@ -4,5 +4,10 @@ "password": "Kj#9mP2$", "database": "travel_content", "port": 3306, - "charset": "utf8mb4" + "charset": "utf8mb4", + "pool_size": 10, + "max_retry_attempts": 3, + "query_timeout": 30, + "soft_delete_field": "isDelete", + "active_record_value": 0 } \ No newline at end of file