TravelContentCreator/api/services/database_service.py

650 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
数据库服务层
负责景区、产品、风格、受众等数据的查询和管理
"""
import logging
import time
import traceback
from typing import Dict, Any, Optional, List, Tuple
import mysql.connector
from mysql.connector import pooling
from core.config import ConfigManager
logger = logging.getLogger(__name__)
class DatabaseService:
"""数据库服务类"""
def __init__(self, config_manager: ConfigManager):
"""
初始化数据库服务
Args:
config_manager: 配置管理器
"""
self.config_manager = config_manager
self.db_pool = self._init_db_pool()
def _init_db_pool(self):
"""初始化数据库连接池"""
# 获取数据库配置
raw_db_config = self.config_manager.get_raw_config('database')
# 处理环境变量
db_config = self._process_env_vars(raw_db_config)
# 连接尝试配置
connection_attempts = [
{"desc": "使用配置文件中的设置", "config": db_config},
{"desc": "使用明确的密码", "config": {**db_config, "password": "password"}},
{"desc": "使用空密码", "config": {**db_config, "password": ""}},
{"desc": "使用auth_plugin", "config": {**db_config, "auth_plugin": "mysql_native_password"}}
]
# 尝试不同的连接方式
for attempt in connection_attempts:
try:
# 打印连接信息(不包含密码)
connection_info = {k: v for k, v in attempt["config"].items() if k != 'password'}
logger.info(f"尝试连接数据库 ({attempt['desc']}): {connection_info}")
# 创建连接池
pool = pooling.MySQLConnectionPool(
pool_name=f"database_service_pool_{int(time.time())}",
pool_size=10,
**attempt["config"]
)
# 测试连接
with pool.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT 1")
cursor.fetchall()
logger.info(f"数据库连接池初始化成功 ({attempt['desc']})")
return pool
except Exception as e:
error_details = traceback.format_exc()
logger.error(f"数据库连接尝试 ({attempt['desc']}) 失败: {e}\n{error_details}")
logger.error("所有数据库连接尝试都失败了")
return None
def _process_env_vars(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""处理环境变量"""
import os
processed_config = {}
for key, value in config.items():
if isinstance(value, str) and value.startswith("${") and value.endswith("}"):
env_var = value[2:-1]
processed_config[key] = os.environ.get(env_var, "")
else:
processed_config[key] = value
return processed_config
def get_scenic_spot_by_id(self, spot_id: int) -> Optional[Dict[str, Any]]:
"""
根据ID获取景区信息
Args:
spot_id: 景区ID
Returns:
景区信息字典如果未找到则返回None
"""
if not self.db_pool:
logger.error("数据库连接池未初始化")
return None
try:
conn = self.db_pool.get_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute(
"SELECT * FROM scenicSpot WHERE id = %s AND isDelete = 0",
(spot_id,)
)
result = cursor.fetchone()
cursor.close()
conn.close()
if result:
logger.info(f"找到景区信息: ID={spot_id}, 名称={result['name']}")
return result
else:
logger.warning(f"未找到景区信息: ID={spot_id}")
return None
except Exception as e:
logger.error(f"查询景区信息失败: {e}")
return None
def get_product_by_id(self, product_id: int) -> Optional[Dict[str, Any]]:
"""
根据ID获取产品信息
Args:
product_id: 产品ID
Returns:
产品信息字典如果未找到则返回None
"""
if not self.db_pool:
logger.error("数据库连接池未初始化")
return None
try:
conn = self.db_pool.get_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute(
"SELECT * FROM product WHERE id = %s AND isDelete = 0",
(product_id,)
)
result = cursor.fetchone()
cursor.close()
conn.close()
if result:
logger.info(f"找到产品信息: ID={product_id}, 名称={result['name']}")
return result
else:
logger.warning(f"未找到产品信息: ID={product_id}")
return None
except Exception as e:
logger.error(f"查询产品信息失败: {e}")
return None
def get_style_by_id(self, style_id: int) -> Optional[Dict[str, Any]]:
"""
根据ID获取风格信息
Args:
style_id: 风格ID
Returns:
风格信息字典如果未找到则返回None
"""
if not self.db_pool:
logger.error("数据库连接池未初始化")
return None
try:
conn = self.db_pool.get_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute(
"SELECT * FROM contentStyle WHERE id = %s AND isDelete = 0",
(style_id,)
)
result = cursor.fetchone()
cursor.close()
conn.close()
if result:
logger.info(f"找到风格信息: ID={style_id}, 名称={result['styleName']}")
return result
else:
logger.warning(f"未找到风格信息: ID={style_id}")
return None
except Exception as e:
logger.error(f"查询风格信息失败: {e}")
return None
def get_audience_by_id(self, audience_id: int) -> Optional[Dict[str, Any]]:
"""
根据ID获取受众信息
Args:
audience_id: 受众ID
Returns:
受众信息字典如果未找到则返回None
"""
if not self.db_pool:
logger.error("数据库连接池未初始化")
return None
try:
conn = self.db_pool.get_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute(
"SELECT * FROM targetAudience WHERE id = %s AND isDelete = 0",
(audience_id,)
)
result = cursor.fetchone()
cursor.close()
conn.close()
if result:
logger.info(f"找到受众信息: ID={audience_id}, 名称={result['audienceName']}")
return result
else:
logger.warning(f"未找到受众信息: ID={audience_id}")
return None
except Exception as e:
logger.error(f"查询受众信息失败: {e}")
return None
def get_scenic_spots_by_ids(self, spot_ids: List[int]) -> List[Dict[str, Any]]:
"""
根据ID列表批量获取景区信息
Args:
spot_ids: 景区ID列表
Returns:
景区信息列表
"""
if not self.db_pool or not spot_ids:
return []
try:
conn = self.db_pool.get_connection()
cursor = conn.cursor(dictionary=True)
# 构建IN查询
placeholders = ','.join(['%s'] * len(spot_ids))
query = f"SELECT * FROM scenicSpot WHERE id IN ({placeholders}) AND isDelete = 0"
cursor.execute(query, spot_ids)
results = cursor.fetchall()
cursor.close()
conn.close()
logger.info(f"批量查询景区信息: 请求{len(spot_ids)}个,找到{len(results)}")
return results
except Exception as e:
logger.error(f"批量查询景区信息失败: {e}")
return []
def get_products_by_ids(self, productIds: List[int]) -> List[Dict[str, Any]]:
"""
根据ID列表批量获取产品信息
Args:
productIds: 产品ID列表
Returns:
产品信息列表
"""
if not self.db_pool or not productIds:
return []
try:
conn = self.db_pool.get_connection()
cursor = conn.cursor(dictionary=True)
# 构建IN查询
placeholders = ','.join(['%s'] * len(productIds))
query = f"SELECT * FROM product WHERE id IN ({placeholders}) AND isDelete = 0"
cursor.execute(query, productIds)
results = cursor.fetchall()
cursor.close()
conn.close()
logger.info(f"批量查询产品信息: 请求{len(productIds)}个,找到{len(results)}")
return results
except Exception as e:
logger.error(f"批量查询产品信息失败: {e}")
return []
def get_styles_by_ids(self, styleIds: List[int]) -> List[Dict[str, Any]]:
"""
根据ID列表批量获取风格信息
Args:
styleIds: 风格ID列表
Returns:
风格信息列表
"""
if not self.db_pool or not styleIds:
return []
try:
conn = self.db_pool.get_connection()
cursor = conn.cursor(dictionary=True)
# 构建IN查询
placeholders = ','.join(['%s'] * len(styleIds))
query = f"SELECT * FROM contentStyle WHERE id IN ({placeholders}) AND isDelete = 0"
cursor.execute(query, styleIds)
results = cursor.fetchall()
cursor.close()
conn.close()
logger.info(f"批量查询风格信息: 请求{len(styleIds)}个,找到{len(results)}")
return results
except Exception as e:
logger.error(f"批量查询风格信息失败: {e}")
return []
def get_audiences_by_ids(self, audienceIds: List[int]) -> List[Dict[str, Any]]:
"""
根据ID列表批量获取受众信息
Args:
audienceIds: 受众ID列表
Returns:
受众信息列表
"""
if not self.db_pool or not audienceIds:
return []
try:
conn = self.db_pool.get_connection()
cursor = conn.cursor(dictionary=True)
# 构建IN查询
placeholders = ','.join(['%s'] * len(audienceIds))
query = f"SELECT * FROM targetAudience WHERE id IN ({placeholders}) AND isDelete = 0"
cursor.execute(query, audienceIds)
results = cursor.fetchall()
cursor.close()
conn.close()
logger.info(f"批量查询受众信息: 请求{len(audienceIds)}个,找到{len(results)}")
return results
except Exception as e:
logger.error(f"批量查询受众信息失败: {e}")
return []
def list_all_scenic_spots(self, user_id: Optional[int] = None, is_public: Optional[bool] = None) -> List[Dict[str, Any]]:
"""
获取所有景区列表
Args:
user_id: 用户ID如果提供则只返回该用户的景区
is_public: 是否公开,如果提供则过滤公开/私有景区
Returns:
景区列表
"""
if not self.db_pool:
return []
try:
conn = self.db_pool.get_connection()
cursor = conn.cursor(dictionary=True)
# 构建查询条件
conditions = ["isDelete = 0"]
params = []
if user_id is not None:
conditions.append("userId = %s")
params.append(user_id)
if is_public is not None:
conditions.append("isPublic = %s")
params.append(1 if is_public else 0)
query = f"SELECT id, name, address, description, advantage, highlight, isPublic, userId FROM scenicSpot WHERE {' AND '.join(conditions)} ORDER BY name"
cursor.execute(query, params)
results = cursor.fetchall()
cursor.close()
conn.close()
logger.info(f"获取景区列表: 找到{len(results)}个景区")
return results
except Exception as e:
logger.error(f"获取景区列表失败: {e}")
return []
def list_all_products(self, user_id: Optional[int] = None, is_public: Optional[bool] = None) -> List[Dict[str, Any]]:
"""
获取所有产品列表
Args:
user_id: 用户ID如果提供则只返回该用户的产品
is_public: 是否公开,如果提供则过滤公开/私有产品
Returns:
产品列表
"""
if not self.db_pool:
return []
try:
conn = self.db_pool.get_connection()
cursor = conn.cursor(dictionary=True)
# 构建查询条件
conditions = ["isDelete = 0"]
params = []
if user_id is not None:
conditions.append("userId = %s")
params.append(user_id)
if is_public is not None:
conditions.append("isPublic = %s")
params.append(1 if is_public else 0)
query = f"SELECT id, name, originPrice, realPrice, packageInfo, description, advantage, highlight, isPublic, userId FROM product WHERE {' AND '.join(conditions)} ORDER BY name"
cursor.execute(query, params)
results = cursor.fetchall()
cursor.close()
conn.close()
logger.info(f"获取产品列表: 找到{len(results)}个产品")
return results
except Exception as e:
logger.error(f"获取产品列表失败: {e}")
return []
def list_all_styles(self) -> List[Dict[str, Any]]:
"""
获取所有风格列表
Returns:
风格列表
"""
if not self.db_pool:
return []
try:
conn = self.db_pool.get_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT id, styleName, description FROM contentStyle WHERE isDelete = 0 ORDER BY styleName")
results = cursor.fetchall()
cursor.close()
conn.close()
logger.info(f"获取风格列表: 找到{len(results)}个风格")
return results
except Exception as e:
logger.error(f"获取风格列表失败: {e}")
return []
def list_all_audiences(self) -> List[Dict[str, Any]]:
"""
获取所有受众列表
Returns:
受众列表
"""
if not self.db_pool:
return []
try:
conn = self.db_pool.get_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT id, audienceName, description FROM targetAudience WHERE isDelete = 0 ORDER BY audienceName")
results = cursor.fetchall()
cursor.close()
conn.close()
logger.info(f"获取受众列表: 找到{len(results)}个受众")
return results
except Exception as e:
logger.error(f"获取受众列表失败: {e}")
return []
def is_available(self) -> bool:
"""
检查数据库服务是否可用
Returns:
数据库是否可用
"""
return self.db_pool is not None
# 名称到ID的反向查询方法
def get_style_id_by_name(self, style_name: str) -> Optional[int]:
"""
根据风格名称获取风格ID
Args:
style_name: 风格名称
Returns:
风格ID如果未找到则返回None
"""
if not self.db_pool:
logger.error("数据库连接池未初始化")
return None
try:
conn = self.db_pool.get_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute(
"SELECT id FROM style WHERE styleName = %s AND isDelete = 0",
(style_name,)
)
result = cursor.fetchone()
cursor.close()
conn.close()
if result:
return result['id']
else:
logger.warning(f"未找到风格: {style_name}")
return None
except Exception as e:
logger.error(f"查询风格ID失败: {e}")
return None
def get_audience_id_by_name(self, audience_name: str) -> Optional[int]:
"""
根据受众名称获取受众ID
Args:
audience_name: 受众名称
Returns:
受众ID如果未找到则返回None
"""
if not self.db_pool:
logger.error("数据库连接池未初始化")
return None
try:
conn = self.db_pool.get_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute(
"SELECT id FROM targetAudience WHERE audienceName = %s AND isDelete = 0",
(audience_name,)
)
result = cursor.fetchone()
cursor.close()
conn.close()
if result:
return result['id']
else:
logger.warning(f"未找到受众: {audience_name}")
return None
except Exception as e:
logger.error(f"查询受众ID失败: {e}")
return None
def get_scenic_spot_id_by_name(self, spot_name: str) -> Optional[int]:
"""
根据景区名称获取景区ID
Args:
spot_name: 景区名称
Returns:
景区ID如果未找到则返回None
"""
if not self.db_pool:
logger.error("数据库连接池未初始化")
return None
try:
conn = self.db_pool.get_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute(
"SELECT id FROM scenicSpot WHERE name = %s AND isDelete = 0",
(spot_name,)
)
result = cursor.fetchone()
cursor.close()
conn.close()
if result:
return result['id']
else:
logger.warning(f"未找到景区: {spot_name}")
return None
except Exception as e:
logger.error(f"查询景区ID失败: {e}")
return None
def get_product_id_by_name(self, product_name: str) -> Optional[int]:
"""
根据产品名称获取产品ID
Args:
product_name: 产品名称
Returns:
产品ID如果未找到则返回None
"""
if not self.db_pool:
logger.error("数据库连接池未初始化")
return None
try:
conn = self.db_pool.get_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute(
"SELECT id FROM product WHERE productName = %s AND isDelete = 0",
(product_name,)
)
result = cursor.fetchone()
cursor.close()
conn.close()
if result:
return result['id']
else:
logger.warning(f"未找到产品: {product_name}")
return None
except Exception as e:
logger.error(f"查询产品ID失败: {e}")
return None