#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Database Service 数据库服务 - API v2 基于原项目的数据库服务实现 """ import logging import os import json import time import traceback from typing import Dict, Any, Optional, List from pathlib import Path try: import mysql.connector from mysql.connector import pooling HAS_MYSQL = True except ImportError: HAS_MYSQL = False mysql = None pooling = None logger = logging.getLogger(__name__) class DatabaseService: """数据库服务类""" def __init__(self, db_config_path: str = None): """ 初始化数据库服务 Args: db_config_path: 数据库配置文件路径 """ self.db_pool = None self.config = None if not HAS_MYSQL: logger.warning("mysql-connector-python 未安装,将使用模拟数据") self._init_mock_data() return # 加载数据库配置 self.config = self._load_db_config(db_config_path) if self.config: self.db_pool = self._init_db_pool() # 如果数据库连接失败,回退到模拟数据 if not self.db_pool: logger.warning("数据库连接失败,将使用模拟数据") self._init_mock_data() def _load_db_config(self, config_path: str = None) -> Optional[Dict[str, Any]]: """加载数据库配置""" if not config_path: # 默认配置路径 project_root = Path(__file__).parent.parent.parent config_path = project_root / "config" / "database.json" try: if Path(config_path).exists(): with open(config_path, 'r', encoding='utf-8') as f: config = json.load(f) # 处理环境变量 return self._process_env_vars(config) else: logger.warning(f"数据库配置文件不存在: {config_path}") return None except Exception as e: logger.error(f"加载数据库配置失败: {e}") return None def _process_env_vars(self, config: Dict[str, Any]) -> Dict[str, Any]: """处理环境变量""" processed_config = {} for key, value in config.items(): if isinstance(value, str) and value.startswith("${") and value.endswith("}"): env_var = value[2:-1] processed_config[key] = os.environ.get(env_var, "") else: processed_config[key] = value return processed_config def _init_db_pool(self): """初始化数据库连接池""" if not self.config: return None # 连接尝试配置 connection_attempts = [ {"desc": "使用配置文件中的设置", "config": self.config}, {"desc": "使用明确的密码", "config": {**self.config, "password": "password"}}, {"desc": "使用空密码", "config": {**self.config, "password": ""}}, {"desc": "使用auth_plugin", "config": {**self.config, "auth_plugin": "mysql_native_password"}} ] # 尝试不同的连接方式 for attempt in connection_attempts: try: # 打印连接信息(不包含密码) connection_info = {k: v for k, v in attempt["config"].items() if k != 'password'} logger.info(f"尝试连接数据库 ({attempt['desc']}): {connection_info}") # 创建连接池 pool = pooling.MySQLConnectionPool( pool_name=f"api_v2_pool_{int(time.time())}", pool_size=10, **attempt["config"] ) # 测试连接 with pool.get_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT 1") cursor.fetchall() logger.info(f"数据库连接池初始化成功 ({attempt['desc']})") return pool except Exception as e: error_details = traceback.format_exc() logger.error(f"数据库连接尝试 ({attempt['desc']}) 失败: {e}\n{error_details}") logger.error("所有数据库连接尝试都失败了") return None def _init_mock_data(self): """初始化模拟数据""" self.mock_data = { "scenic_spots": [ {"id": 1, "name": "三亚亚龙湾", "address": "海南省三亚市", "description": "天下第一湾", "advantage": "海水清澈,沙滩细软", "highlight": "热带风情", "isPublic": True, "userId": 1}, {"id": 2, "name": "北京故宫", "address": "北京市东城区", "description": "明清皇宫", "advantage": "历史悠久", "highlight": "古建筑群", "isPublic": True, "userId": 1}, {"id": 3, "name": "杭州西湖", "address": "浙江省杭州市", "description": "人间天堂", "advantage": "风景秀丽", "highlight": "湖光山色", "isPublic": True, "userId": 1}, {"id": 4, "name": "桂林山水", "address": "广西桂林市", "description": "甲天下", "advantage": "山清水秀", "highlight": "喀斯特地貌", "isPublic": True, "userId": 1}, {"id": 5, "name": "黄山", "address": "安徽省黄山市", "description": "天下第一奇山", "advantage": "奇松怪石", "highlight": "云海日出", "isPublic": True, "userId": 1} ], "products": [ {"id": 1, "productName": "亚龙湾度假套餐", "originPrice": 2999.0, "realPrice": 1999.0, "packageInfo": "3天2晚", "detailedDescription": "包含酒店住宿、早餐、接送服务", "keyAdvantages": "五星级酒店,私人海滩", "highlights": "海景房,无边泳池", "usageRules": "需提前3天预订", "surcharge": "节假日加价200元", "reservation": "电话预订", "userId": 1}, {"id": 2, "productName": "故宫门票", "originPrice": 60.0, "realPrice": 60.0, "packageInfo": "成人票", "detailedDescription": "故宫博物院参观门票", "keyAdvantages": "免排队入场", "highlights": "珍贵文物", "usageRules": "当日有效", "surcharge": "无", "reservation": "网上预订", "userId": 1}, {"id": 3, "productName": "西湖游船票", "originPrice": 55.0, "realPrice": 45.0, "packageInfo": "单程票", "detailedDescription": "西湖景区游船体验", "keyAdvantages": "湖心亭停靠", "highlights": "湖光山色", "usageRules": "当日有效", "surcharge": "无", "reservation": "现场购买", "userId": 1} ], "styles": [ {"id": 1, "styleName": "活泼有趣", "styleDescription": "轻松愉快的语言风格,充满活力"}, {"id": 2, "styleName": "专业严谨", "styleDescription": "专业权威的表达方式,信息准确"}, {"id": 3, "styleName": "温馨感人", "styleDescription": "温暖感人的情感表达,富有人情味"}, {"id": 4, "styleName": "幽默风趣", "styleDescription": "诙谐幽默的表达方式,富有趣味性"}, {"id": 5, "styleName": "文艺清新", "styleDescription": "文艺范的表达方式,清新脱俗"} ], "audiences": [ {"id": 1, "audienceName": "年轻情侣", "audienceDescription": "18-35岁的情侣群体,追求浪漫体验"}, {"id": 2, "audienceName": "亲子家庭", "audienceDescription": "有孩子的家庭群体,注重亲子互动"}, {"id": 3, "audienceName": "商务人士", "audienceDescription": "商务出行的专业人群,时间宝贵"}, {"id": 4, "audienceName": "年轻学生", "audienceDescription": "大学生群体,预算有限但充满活力"}, {"id": 5, "audienceName": "中老年群体", "audienceDescription": "50岁以上群体,注重舒适和安全"} ] } def is_available(self) -> bool: """检查数据库服务是否可用""" if self.db_pool: try: with self.db_pool.get_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT 1") cursor.fetchall() return True except: return False return False def get_scenic_spots(self, limit: int = 10, offset: int = 0, search: str = None) -> tuple[List[Dict[str, Any]], int]: """获取景区列表""" if self.db_pool and self.is_available(): return self._get_scenic_spots_from_db(limit, offset, search) else: return self._get_scenic_spots_from_mock(limit, offset, search) def _get_scenic_spots_from_db(self, limit: int, offset: int, search: str = None) -> tuple[List[Dict[str, Any]], int]: """从数据库获取景区列表""" try: with self.db_pool.get_connection() as conn: cursor = conn.cursor(dictionary=True) # 构建查询语句 base_query = "SELECT * FROM scenic_spots WHERE isPublic = 1" count_query = "SELECT COUNT(*) as total FROM scenic_spots WHERE isPublic = 1" params = [] if search: base_query += " AND name LIKE %s" count_query += " AND name LIKE %s" search_param = f"%{search}%" params.append(search_param) # 获取总数 cursor.execute(count_query, params) total = cursor.fetchone()['total'] # 获取分页数据 base_query += " ORDER BY id LIMIT %s OFFSET %s" params.extend([limit, offset]) cursor.execute(base_query, params) spots = cursor.fetchall() return spots, total except Exception as e: logger.error(f"从数据库获取景区列表失败: {e}") return self._get_scenic_spots_from_mock(limit, offset, search) def _get_scenic_spots_from_mock(self, limit: int, offset: int, search: str = None) -> tuple[List[Dict[str, Any]], int]: """从模拟数据获取景区列表""" spots = self.mock_data["scenic_spots"] # 搜索过滤 if search: spots = [spot for spot in spots if search.lower() in spot["name"].lower()] # 分页 total = len(spots) spots = spots[offset:offset + limit] return spots, total def get_scenic_spot_by_id(self, spot_id: int) -> Optional[Dict[str, Any]]: """根据ID获取景区详情""" if self.db_pool and self.is_available(): return self._get_scenic_spot_by_id_from_db(spot_id) else: return self._get_scenic_spot_by_id_from_mock(spot_id) def _get_scenic_spot_by_id_from_db(self, spot_id: int) -> Optional[Dict[str, Any]]: """从数据库根据ID获取景区详情""" try: with self.db_pool.get_connection() as conn: cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM scenic_spots WHERE id = %s AND isPublic = 1", (spot_id,)) return cursor.fetchone() except Exception as e: logger.error(f"从数据库获取景区详情失败: {e}") return self._get_scenic_spot_by_id_from_mock(spot_id) def _get_scenic_spot_by_id_from_mock(self, spot_id: int) -> Optional[Dict[str, Any]]: """从模拟数据根据ID获取景区详情""" return next((s for s in self.mock_data["scenic_spots"] if s["id"] == spot_id), None) def get_products(self, limit: int = 10, offset: int = 0, search: str = None) -> tuple[List[Dict[str, Any]], int]: """获取产品列表""" if self.db_pool and self.is_available(): return self._get_products_from_db(limit, offset, search) else: return self._get_products_from_mock(limit, offset, search) def _get_products_from_db(self, limit: int, offset: int, search: str = None) -> tuple[List[Dict[str, Any]], int]: """从数据库获取产品列表""" try: with self.db_pool.get_connection() as conn: cursor = conn.cursor(dictionary=True) base_query = "SELECT * FROM products" count_query = "SELECT COUNT(*) as total FROM products" params = [] if search: base_query += " WHERE productName LIKE %s" count_query += " WHERE productName LIKE %s" search_param = f"%{search}%" params.append(search_param) # 获取总数 cursor.execute(count_query, params) total = cursor.fetchone()['total'] # 获取分页数据 base_query += " ORDER BY id LIMIT %s OFFSET %s" params.extend([limit, offset]) cursor.execute(base_query, params) products = cursor.fetchall() return products, total except Exception as e: logger.error(f"从数据库获取产品列表失败: {e}") return self._get_products_from_mock(limit, offset, search) def _get_products_from_mock(self, limit: int, offset: int, search: str = None) -> tuple[List[Dict[str, Any]], int]: """从模拟数据获取产品列表""" products = self.mock_data["products"] # 搜索过滤 if search: products = [p for p in products if search.lower() in p["productName"].lower()] # 分页 total = len(products) products = products[offset:offset + limit] return products, total def get_product_by_id(self, product_id: int) -> Optional[Dict[str, Any]]: """根据ID获取产品详情""" if self.db_pool and self.is_available(): return self._get_product_by_id_from_db(product_id) else: return self._get_product_by_id_from_mock(product_id) def _get_product_by_id_from_db(self, product_id: int) -> Optional[Dict[str, Any]]: """从数据库根据ID获取产品详情""" try: with self.db_pool.get_connection() as conn: cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM products WHERE id = %s", (product_id,)) return cursor.fetchone() except Exception as e: logger.error(f"从数据库获取产品详情失败: {e}") return self._get_product_by_id_from_mock(product_id) def _get_product_by_id_from_mock(self, product_id: int) -> Optional[Dict[str, Any]]: """从模拟数据根据ID获取产品详情""" return next((p for p in self.mock_data["products"] if p["id"] == product_id), None) def get_styles(self) -> List[Dict[str, Any]]: """获取风格列表""" if self.db_pool and self.is_available(): return self._get_styles_from_db() else: return self.mock_data["styles"] def _get_styles_from_db(self) -> List[Dict[str, Any]]: """从数据库获取风格列表""" try: with self.db_pool.get_connection() as conn: cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM styles ORDER BY id") return cursor.fetchall() except Exception as e: logger.error(f"从数据库获取风格列表失败: {e}") return self.mock_data["styles"] def get_audiences(self) -> List[Dict[str, Any]]: """获取受众列表""" if self.db_pool and self.is_available(): return self._get_audiences_from_db() else: return self.mock_data["audiences"] def _get_audiences_from_db(self) -> List[Dict[str, Any]]: """从数据库获取受众列表""" try: with self.db_pool.get_connection() as conn: cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM audiences ORDER BY id") return cursor.fetchall() except Exception as e: logger.error(f"从数据库获取受众列表失败: {e}") return self.mock_data["audiences"] def get_scenic_spots_by_ids(self, spot_ids: List[int]) -> List[Dict[str, Any]]: """批量获取景区信息""" if self.db_pool and self.is_available(): return self._get_scenic_spots_by_ids_from_db(spot_ids) else: return [s for s in self.mock_data["scenic_spots"] if s["id"] in spot_ids] def _get_scenic_spots_by_ids_from_db(self, spot_ids: List[int]) -> List[Dict[str, Any]]: """从数据库批量获取景区信息""" try: with self.db_pool.get_connection() as conn: cursor = conn.cursor(dictionary=True) placeholders = ','.join(['%s'] * len(spot_ids)) query = f"SELECT * FROM scenic_spots WHERE id IN ({placeholders}) AND isPublic = 1" cursor.execute(query, spot_ids) return cursor.fetchall() except Exception as e: logger.error(f"从数据库批量获取景区信息失败: {e}") return [s for s in self.mock_data["scenic_spots"] if s["id"] in spot_ids] def get_products_by_ids(self, product_ids: List[int]) -> List[Dict[str, Any]]: """批量获取产品信息""" if self.db_pool and self.is_available(): return self._get_products_by_ids_from_db(product_ids) else: return [p for p in self.mock_data["products"] if p["id"] in product_ids] def _get_products_by_ids_from_db(self, product_ids: List[int]) -> List[Dict[str, Any]]: """从数据库批量获取产品信息""" try: with self.db_pool.get_connection() as conn: cursor = conn.cursor(dictionary=True) placeholders = ','.join(['%s'] * len(product_ids)) query = f"SELECT * FROM products WHERE id IN ({placeholders})" cursor.execute(query, product_ids) return cursor.fetchall() except Exception as e: logger.error(f"从数据库批量获取产品信息失败: {e}") return [p for p in self.mock_data["products"] if p["id"] in product_ids]