508 lines
17 KiB
Python
508 lines
17 KiB
Python
|
|
#!/usr/bin/env python3
|
|||
|
|
# -*- coding: utf-8 -*-
|
|||
|
|
|
|||
|
|
"""
|
|||
|
|
数据库服务层
|
|||
|
|
负责景区、产品、风格、受众等数据的查询和管理
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import logging
|
|||
|
|
import time
|
|||
|
|
import traceback
|
|||
|
|
from typing import Dict, Any, Optional, List, Tuple
|
|||
|
|
import mysql.connector
|
|||
|
|
from mysql.connector import pooling
|
|||
|
|
|
|||
|
|
from core.config import ConfigManager
|
|||
|
|
|
|||
|
|
logger = logging.getLogger(__name__)
|
|||
|
|
|
|||
|
|
|
|||
|
|
class DatabaseService:
|
|||
|
|
"""数据库服务类"""
|
|||
|
|
|
|||
|
|
def __init__(self, config_manager: ConfigManager):
|
|||
|
|
"""
|
|||
|
|
初始化数据库服务
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
config_manager: 配置管理器
|
|||
|
|
"""
|
|||
|
|
self.config_manager = config_manager
|
|||
|
|
self.db_pool = self._init_db_pool()
|
|||
|
|
|
|||
|
|
def _init_db_pool(self):
|
|||
|
|
"""初始化数据库连接池"""
|
|||
|
|
# 获取数据库配置
|
|||
|
|
raw_db_config = self.config_manager.get_raw_config('database')
|
|||
|
|
|
|||
|
|
# 处理环境变量
|
|||
|
|
db_config = self._process_env_vars(raw_db_config)
|
|||
|
|
|
|||
|
|
# 连接尝试配置
|
|||
|
|
connection_attempts = [
|
|||
|
|
{"desc": "使用配置文件中的设置", "config": db_config},
|
|||
|
|
{"desc": "使用明确的密码", "config": {**db_config, "password": "password"}},
|
|||
|
|
{"desc": "使用空密码", "config": {**db_config, "password": ""}},
|
|||
|
|
{"desc": "使用auth_plugin", "config": {**db_config, "auth_plugin": "mysql_native_password"}}
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
# 尝试不同的连接方式
|
|||
|
|
for attempt in connection_attempts:
|
|||
|
|
try:
|
|||
|
|
# 打印连接信息(不包含密码)
|
|||
|
|
connection_info = {k: v for k, v in attempt["config"].items() if k != 'password'}
|
|||
|
|
logger.info(f"尝试连接数据库 ({attempt['desc']}): {connection_info}")
|
|||
|
|
|
|||
|
|
# 创建连接池
|
|||
|
|
pool = pooling.MySQLConnectionPool(
|
|||
|
|
pool_name=f"database_service_pool_{int(time.time())}",
|
|||
|
|
pool_size=10,
|
|||
|
|
**attempt["config"]
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# 测试连接
|
|||
|
|
with pool.get_connection() as conn:
|
|||
|
|
cursor = conn.cursor()
|
|||
|
|
cursor.execute("SELECT 1")
|
|||
|
|
cursor.fetchall()
|
|||
|
|
|
|||
|
|
logger.info(f"数据库连接池初始化成功 ({attempt['desc']})")
|
|||
|
|
return pool
|
|||
|
|
except Exception as e:
|
|||
|
|
error_details = traceback.format_exc()
|
|||
|
|
logger.error(f"数据库连接尝试 ({attempt['desc']}) 失败: {e}\n{error_details}")
|
|||
|
|
|
|||
|
|
logger.error("所有数据库连接尝试都失败了")
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
def _process_env_vars(self, config: Dict[str, Any]) -> Dict[str, Any]:
|
|||
|
|
"""处理环境变量"""
|
|||
|
|
import os
|
|||
|
|
|
|||
|
|
processed_config = {}
|
|||
|
|
for key, value in config.items():
|
|||
|
|
if isinstance(value, str) and value.startswith("${") and value.endswith("}"):
|
|||
|
|
env_var = value[2:-1]
|
|||
|
|
processed_config[key] = os.environ.get(env_var, "")
|
|||
|
|
else:
|
|||
|
|
processed_config[key] = value
|
|||
|
|
|
|||
|
|
return processed_config
|
|||
|
|
|
|||
|
|
def get_scenic_spot_by_id(self, spot_id: int) -> Optional[Dict[str, Any]]:
|
|||
|
|
"""
|
|||
|
|
根据ID获取景区信息
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
spot_id: 景区ID
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
景区信息字典,如果未找到则返回None
|
|||
|
|
"""
|
|||
|
|
if not self.db_pool:
|
|||
|
|
logger.error("数据库连接池未初始化")
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
conn = self.db_pool.get_connection()
|
|||
|
|
cursor = conn.cursor(dictionary=True)
|
|||
|
|
cursor.execute(
|
|||
|
|
"SELECT * FROM scenicSpot WHERE id = %s AND isDelete = 0",
|
|||
|
|
(spot_id,)
|
|||
|
|
)
|
|||
|
|
result = cursor.fetchone()
|
|||
|
|
cursor.close()
|
|||
|
|
conn.close()
|
|||
|
|
|
|||
|
|
if result:
|
|||
|
|
logger.info(f"找到景区信息: ID={spot_id}, 名称={result['name']}")
|
|||
|
|
return result
|
|||
|
|
else:
|
|||
|
|
logger.warning(f"未找到景区信息: ID={spot_id}")
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"查询景区信息失败: {e}")
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
def get_product_by_id(self, product_id: int) -> Optional[Dict[str, Any]]:
|
|||
|
|
"""
|
|||
|
|
根据ID获取产品信息
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
product_id: 产品ID
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
产品信息字典,如果未找到则返回None
|
|||
|
|
"""
|
|||
|
|
if not self.db_pool:
|
|||
|
|
logger.error("数据库连接池未初始化")
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
conn = self.db_pool.get_connection()
|
|||
|
|
cursor = conn.cursor(dictionary=True)
|
|||
|
|
cursor.execute(
|
|||
|
|
"SELECT * FROM product WHERE id = %s AND isDelete = 0",
|
|||
|
|
(product_id,)
|
|||
|
|
)
|
|||
|
|
result = cursor.fetchone()
|
|||
|
|
cursor.close()
|
|||
|
|
conn.close()
|
|||
|
|
|
|||
|
|
if result:
|
|||
|
|
logger.info(f"找到产品信息: ID={product_id}, 名称={result['name']}")
|
|||
|
|
return result
|
|||
|
|
else:
|
|||
|
|
logger.warning(f"未找到产品信息: ID={product_id}")
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"查询产品信息失败: {e}")
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
def get_style_by_id(self, style_id: int) -> Optional[Dict[str, Any]]:
|
|||
|
|
"""
|
|||
|
|
根据ID获取风格信息
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
style_id: 风格ID
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
风格信息字典,如果未找到则返回None
|
|||
|
|
"""
|
|||
|
|
if not self.db_pool:
|
|||
|
|
logger.error("数据库连接池未初始化")
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
conn = self.db_pool.get_connection()
|
|||
|
|
cursor = conn.cursor(dictionary=True)
|
|||
|
|
cursor.execute(
|
|||
|
|
"SELECT * FROM contentStyle WHERE id = %s AND isDelete = 0",
|
|||
|
|
(style_id,)
|
|||
|
|
)
|
|||
|
|
result = cursor.fetchone()
|
|||
|
|
cursor.close()
|
|||
|
|
conn.close()
|
|||
|
|
|
|||
|
|
if result:
|
|||
|
|
logger.info(f"找到风格信息: ID={style_id}, 名称={result['styleName']}")
|
|||
|
|
return result
|
|||
|
|
else:
|
|||
|
|
logger.warning(f"未找到风格信息: ID={style_id}")
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"查询风格信息失败: {e}")
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
def get_audience_by_id(self, audience_id: int) -> Optional[Dict[str, Any]]:
|
|||
|
|
"""
|
|||
|
|
根据ID获取受众信息
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
audience_id: 受众ID
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
受众信息字典,如果未找到则返回None
|
|||
|
|
"""
|
|||
|
|
if not self.db_pool:
|
|||
|
|
logger.error("数据库连接池未初始化")
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
conn = self.db_pool.get_connection()
|
|||
|
|
cursor = conn.cursor(dictionary=True)
|
|||
|
|
cursor.execute(
|
|||
|
|
"SELECT * FROM targetAudience WHERE id = %s AND isDelete = 0",
|
|||
|
|
(audience_id,)
|
|||
|
|
)
|
|||
|
|
result = cursor.fetchone()
|
|||
|
|
cursor.close()
|
|||
|
|
conn.close()
|
|||
|
|
|
|||
|
|
if result:
|
|||
|
|
logger.info(f"找到受众信息: ID={audience_id}, 名称={result['audienceName']}")
|
|||
|
|
return result
|
|||
|
|
else:
|
|||
|
|
logger.warning(f"未找到受众信息: ID={audience_id}")
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"查询受众信息失败: {e}")
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
def get_scenic_spots_by_ids(self, spot_ids: List[int]) -> List[Dict[str, Any]]:
|
|||
|
|
"""
|
|||
|
|
根据ID列表批量获取景区信息
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
spot_ids: 景区ID列表
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
景区信息列表
|
|||
|
|
"""
|
|||
|
|
if not self.db_pool or not spot_ids:
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
conn = self.db_pool.get_connection()
|
|||
|
|
cursor = conn.cursor(dictionary=True)
|
|||
|
|
|
|||
|
|
# 构建IN查询
|
|||
|
|
placeholders = ','.join(['%s'] * len(spot_ids))
|
|||
|
|
query = f"SELECT * FROM scenicSpot WHERE id IN ({placeholders}) AND isDelete = 0"
|
|||
|
|
cursor.execute(query, spot_ids)
|
|||
|
|
results = cursor.fetchall()
|
|||
|
|
cursor.close()
|
|||
|
|
conn.close()
|
|||
|
|
|
|||
|
|
logger.info(f"批量查询景区信息: 请求{len(spot_ids)}个,找到{len(results)}个")
|
|||
|
|
return results
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"批量查询景区信息失败: {e}")
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
def get_products_by_ids(self, product_ids: List[int]) -> List[Dict[str, Any]]:
|
|||
|
|
"""
|
|||
|
|
根据ID列表批量获取产品信息
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
product_ids: 产品ID列表
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
产品信息列表
|
|||
|
|
"""
|
|||
|
|
if not self.db_pool or not product_ids:
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
conn = self.db_pool.get_connection()
|
|||
|
|
cursor = conn.cursor(dictionary=True)
|
|||
|
|
|
|||
|
|
# 构建IN查询
|
|||
|
|
placeholders = ','.join(['%s'] * len(product_ids))
|
|||
|
|
query = f"SELECT * FROM product WHERE id IN ({placeholders}) AND isDelete = 0"
|
|||
|
|
cursor.execute(query, product_ids)
|
|||
|
|
results = cursor.fetchall()
|
|||
|
|
cursor.close()
|
|||
|
|
conn.close()
|
|||
|
|
|
|||
|
|
logger.info(f"批量查询产品信息: 请求{len(product_ids)}个,找到{len(results)}个")
|
|||
|
|
return results
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"批量查询产品信息失败: {e}")
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
def get_styles_by_ids(self, style_ids: List[int]) -> List[Dict[str, Any]]:
|
|||
|
|
"""
|
|||
|
|
根据ID列表批量获取风格信息
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
style_ids: 风格ID列表
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
风格信息列表
|
|||
|
|
"""
|
|||
|
|
if not self.db_pool or not style_ids:
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
conn = self.db_pool.get_connection()
|
|||
|
|
cursor = conn.cursor(dictionary=True)
|
|||
|
|
|
|||
|
|
# 构建IN查询
|
|||
|
|
placeholders = ','.join(['%s'] * len(style_ids))
|
|||
|
|
query = f"SELECT * FROM contentStyle WHERE id IN ({placeholders}) AND isDelete = 0"
|
|||
|
|
cursor.execute(query, style_ids)
|
|||
|
|
results = cursor.fetchall()
|
|||
|
|
cursor.close()
|
|||
|
|
conn.close()
|
|||
|
|
|
|||
|
|
logger.info(f"批量查询风格信息: 请求{len(style_ids)}个,找到{len(results)}个")
|
|||
|
|
return results
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"批量查询风格信息失败: {e}")
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
def get_audiences_by_ids(self, audience_ids: List[int]) -> List[Dict[str, Any]]:
|
|||
|
|
"""
|
|||
|
|
根据ID列表批量获取受众信息
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
audience_ids: 受众ID列表
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
受众信息列表
|
|||
|
|
"""
|
|||
|
|
if not self.db_pool or not audience_ids:
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
conn = self.db_pool.get_connection()
|
|||
|
|
cursor = conn.cursor(dictionary=True)
|
|||
|
|
|
|||
|
|
# 构建IN查询
|
|||
|
|
placeholders = ','.join(['%s'] * len(audience_ids))
|
|||
|
|
query = f"SELECT * FROM targetAudience WHERE id IN ({placeholders}) AND isDelete = 0"
|
|||
|
|
cursor.execute(query, audience_ids)
|
|||
|
|
results = cursor.fetchall()
|
|||
|
|
cursor.close()
|
|||
|
|
conn.close()
|
|||
|
|
|
|||
|
|
logger.info(f"批量查询受众信息: 请求{len(audience_ids)}个,找到{len(results)}个")
|
|||
|
|
return results
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"批量查询受众信息失败: {e}")
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
def list_all_scenic_spots(self, user_id: Optional[int] = None, is_public: Optional[bool] = None) -> List[Dict[str, Any]]:
|
|||
|
|
"""
|
|||
|
|
获取所有景区列表
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
user_id: 用户ID,如果提供则只返回该用户的景区
|
|||
|
|
is_public: 是否公开,如果提供则过滤公开/私有景区
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
景区列表
|
|||
|
|
"""
|
|||
|
|
if not self.db_pool:
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
conn = self.db_pool.get_connection()
|
|||
|
|
cursor = conn.cursor(dictionary=True)
|
|||
|
|
|
|||
|
|
# 构建查询条件
|
|||
|
|
conditions = ["isDelete = 0"]
|
|||
|
|
params = []
|
|||
|
|
|
|||
|
|
if user_id is not None:
|
|||
|
|
conditions.append("userId = %s")
|
|||
|
|
params.append(user_id)
|
|||
|
|
|
|||
|
|
if is_public is not None:
|
|||
|
|
conditions.append("isPublic = %s")
|
|||
|
|
params.append(1 if is_public else 0)
|
|||
|
|
|
|||
|
|
query = f"SELECT id, name, address, description, advantage, highlight, isPublic, userId FROM scenicSpot WHERE {' AND '.join(conditions)} ORDER BY name"
|
|||
|
|
cursor.execute(query, params)
|
|||
|
|
results = cursor.fetchall()
|
|||
|
|
cursor.close()
|
|||
|
|
conn.close()
|
|||
|
|
|
|||
|
|
logger.info(f"获取景区列表: 找到{len(results)}个景区")
|
|||
|
|
return results
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"获取景区列表失败: {e}")
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
def list_all_products(self, user_id: Optional[int] = None, is_public: Optional[bool] = None) -> List[Dict[str, Any]]:
|
|||
|
|
"""
|
|||
|
|
获取所有产品列表
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
user_id: 用户ID,如果提供则只返回该用户的产品
|
|||
|
|
is_public: 是否公开,如果提供则过滤公开/私有产品
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
产品列表
|
|||
|
|
"""
|
|||
|
|
if not self.db_pool:
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
conn = self.db_pool.get_connection()
|
|||
|
|
cursor = conn.cursor(dictionary=True)
|
|||
|
|
|
|||
|
|
# 构建查询条件
|
|||
|
|
conditions = ["isDelete = 0"]
|
|||
|
|
params = []
|
|||
|
|
|
|||
|
|
if user_id is not None:
|
|||
|
|
conditions.append("userId = %s")
|
|||
|
|
params.append(user_id)
|
|||
|
|
|
|||
|
|
if is_public is not None:
|
|||
|
|
conditions.append("isPublic = %s")
|
|||
|
|
params.append(1 if is_public else 0)
|
|||
|
|
|
|||
|
|
query = f"SELECT id, name, originPrice, realPrice, packageInfo, description, advantage, highlight, isPublic, userId FROM product WHERE {' AND '.join(conditions)} ORDER BY name"
|
|||
|
|
cursor.execute(query, params)
|
|||
|
|
results = cursor.fetchall()
|
|||
|
|
cursor.close()
|
|||
|
|
conn.close()
|
|||
|
|
|
|||
|
|
logger.info(f"获取产品列表: 找到{len(results)}个产品")
|
|||
|
|
return results
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"获取产品列表失败: {e}")
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
def list_all_styles(self) -> List[Dict[str, Any]]:
|
|||
|
|
"""
|
|||
|
|
获取所有风格列表
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
风格列表
|
|||
|
|
"""
|
|||
|
|
if not self.db_pool:
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
conn = self.db_pool.get_connection()
|
|||
|
|
cursor = conn.cursor(dictionary=True)
|
|||
|
|
cursor.execute("SELECT id, styleName, description FROM contentStyle WHERE isDelete = 0 ORDER BY styleName")
|
|||
|
|
results = cursor.fetchall()
|
|||
|
|
cursor.close()
|
|||
|
|
conn.close()
|
|||
|
|
|
|||
|
|
logger.info(f"获取风格列表: 找到{len(results)}个风格")
|
|||
|
|
return results
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"获取风格列表失败: {e}")
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
def list_all_audiences(self) -> List[Dict[str, Any]]:
|
|||
|
|
"""
|
|||
|
|
获取所有受众列表
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
受众列表
|
|||
|
|
"""
|
|||
|
|
if not self.db_pool:
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
conn = self.db_pool.get_connection()
|
|||
|
|
cursor = conn.cursor(dictionary=True)
|
|||
|
|
cursor.execute("SELECT id, audienceName, description FROM targetAudience WHERE isDelete = 0 ORDER BY audienceName")
|
|||
|
|
results = cursor.fetchall()
|
|||
|
|
cursor.close()
|
|||
|
|
conn.close()
|
|||
|
|
|
|||
|
|
logger.info(f"获取受众列表: 找到{len(results)}个受众")
|
|||
|
|
return results
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"获取受众列表失败: {e}")
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
def is_available(self) -> bool:
|
|||
|
|
"""
|
|||
|
|
检查数据库服务是否可用
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
数据库是否可用
|
|||
|
|
"""
|
|||
|
|
return self.db_pool is not None
|