2025-07-14 17:46:20 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
数据库服务层
负责景区 、 产品 、 风格 、 受众等数据的查询和管理
"""
import logging
import time
import traceback
from typing import Dict , Any , Optional , List , Tuple
import mysql . connector
from mysql . connector import pooling
from core . config import ConfigManager
logger = logging . getLogger ( __name__ )
class DatabaseService :
""" 数据库服务类 """
def __init__ ( self , config_manager : ConfigManager ) :
"""
初始化数据库服务
Args :
config_manager : 配置管理器
"""
self . config_manager = config_manager
self . db_pool = self . _init_db_pool ( )
def _init_db_pool ( self ) :
""" 初始化数据库连接池 """
# 获取数据库配置
raw_db_config = self . config_manager . get_raw_config ( ' database ' )
# 处理环境变量
db_config = self . _process_env_vars ( raw_db_config )
# 连接尝试配置
connection_attempts = [
{ " desc " : " 使用配置文件中的设置 " , " config " : db_config } ,
{ " desc " : " 使用明确的密码 " , " config " : { * * db_config , " password " : " password " } } ,
{ " desc " : " 使用空密码 " , " config " : { * * db_config , " password " : " " } } ,
{ " desc " : " 使用auth_plugin " , " config " : { * * db_config , " auth_plugin " : " mysql_native_password " } }
]
# 尝试不同的连接方式
for attempt in connection_attempts :
try :
# 打印连接信息(不包含密码)
connection_info = { k : v for k , v in attempt [ " config " ] . items ( ) if k != ' password ' }
logger . info ( f " 尝试连接数据库 ( { attempt [ ' desc ' ] } ): { connection_info } " )
# 创建连接池
pool = pooling . MySQLConnectionPool (
pool_name = f " database_service_pool_ { int ( time . time ( ) ) } " ,
pool_size = 10 ,
* * attempt [ " config " ]
)
# 测试连接
with pool . get_connection ( ) as conn :
cursor = conn . cursor ( )
cursor . execute ( " SELECT 1 " )
cursor . fetchall ( )
logger . info ( f " 数据库连接池初始化成功 ( { attempt [ ' desc ' ] } ) " )
return pool
except Exception as e :
error_details = traceback . format_exc ( )
logger . error ( f " 数据库连接尝试 ( { attempt [ ' desc ' ] } ) 失败: { e } \n { error_details } " )
logger . error ( " 所有数据库连接尝试都失败了 " )
return None
def _process_env_vars ( self , config : Dict [ str , Any ] ) - > Dict [ str , Any ] :
""" 处理环境变量 """
import os
processed_config = { }
for key , value in config . items ( ) :
if isinstance ( value , str ) and value . startswith ( " $ { " ) and value . endswith ( " } " ) :
env_var = value [ 2 : - 1 ]
processed_config [ key ] = os . environ . get ( env_var , " " )
else :
processed_config [ key ] = value
return processed_config
def get_scenic_spot_by_id ( self , spot_id : int ) - > Optional [ Dict [ str , Any ] ] :
"""
根据ID获取景区信息
Args :
spot_id : 景区ID
Returns :
景区信息字典 , 如果未找到则返回None
"""
if not self . db_pool :
logger . error ( " 数据库连接池未初始化 " )
return None
try :
conn = self . db_pool . get_connection ( )
cursor = conn . cursor ( dictionary = True )
cursor . execute (
" SELECT * FROM scenicSpot WHERE id = %s AND isDelete = 0 " ,
( spot_id , )
)
result = cursor . fetchone ( )
cursor . close ( )
conn . close ( )
if result :
logger . info ( f " 找到景区信息: ID= { spot_id } , 名称= { result [ ' name ' ] } " )
return result
else :
logger . warning ( f " 未找到景区信息: ID= { spot_id } " )
return None
except Exception as e :
logger . error ( f " 查询景区信息失败: { e } " )
return None
def get_product_by_id ( self , product_id : int ) - > Optional [ Dict [ str , Any ] ] :
"""
根据ID获取产品信息
Args :
product_id : 产品ID
Returns :
产品信息字典 , 如果未找到则返回None
"""
if not self . db_pool :
logger . error ( " 数据库连接池未初始化 " )
return None
try :
conn = self . db_pool . get_connection ( )
cursor = conn . cursor ( dictionary = True )
cursor . execute (
" SELECT * FROM product WHERE id = %s AND isDelete = 0 " ,
( product_id , )
)
result = cursor . fetchone ( )
cursor . close ( )
conn . close ( )
if result :
logger . info ( f " 找到产品信息: ID= { product_id } , 名称= { result [ ' name ' ] } " )
return result
else :
logger . warning ( f " 未找到产品信息: ID= { product_id } " )
return None
except Exception as e :
logger . error ( f " 查询产品信息失败: { e } " )
return None
def get_style_by_id ( self , style_id : int ) - > Optional [ Dict [ str , Any ] ] :
"""
根据ID获取风格信息
Args :
style_id : 风格ID
Returns :
风格信息字典 , 如果未找到则返回None
"""
if not self . db_pool :
logger . error ( " 数据库连接池未初始化 " )
return None
try :
conn = self . db_pool . get_connection ( )
cursor = conn . cursor ( dictionary = True )
cursor . execute (
" SELECT * FROM contentStyle WHERE id = %s AND isDelete = 0 " ,
( style_id , )
)
result = cursor . fetchone ( )
cursor . close ( )
conn . close ( )
if result :
logger . info ( f " 找到风格信息: ID= { style_id } , 名称= { result [ ' styleName ' ] } " )
return result
else :
logger . warning ( f " 未找到风格信息: ID= { style_id } " )
return None
except Exception as e :
logger . error ( f " 查询风格信息失败: { e } " )
return None
def get_audience_by_id ( self , audience_id : int ) - > Optional [ Dict [ str , Any ] ] :
"""
根据ID获取受众信息
Args :
audience_id : 受众ID
Returns :
受众信息字典 , 如果未找到则返回None
"""
if not self . db_pool :
logger . error ( " 数据库连接池未初始化 " )
return None
try :
conn = self . db_pool . get_connection ( )
cursor = conn . cursor ( dictionary = True )
cursor . execute (
" SELECT * FROM targetAudience WHERE id = %s AND isDelete = 0 " ,
( audience_id , )
)
result = cursor . fetchone ( )
cursor . close ( )
conn . close ( )
if result :
logger . info ( f " 找到受众信息: ID= { audience_id } , 名称= { result [ ' audienceName ' ] } " )
return result
else :
logger . warning ( f " 未找到受众信息: ID= { audience_id } " )
return None
except Exception as e :
logger . error ( f " 查询受众信息失败: { e } " )
return None
def get_scenic_spots_by_ids ( self , spot_ids : List [ int ] ) - > List [ Dict [ str , Any ] ] :
"""
根据ID列表批量获取景区信息
Args :
spot_ids : 景区ID列表
Returns :
景区信息列表
"""
if not self . db_pool or not spot_ids :
return [ ]
try :
conn = self . db_pool . get_connection ( )
cursor = conn . cursor ( dictionary = True )
# 构建IN查询
placeholders = ' , ' . join ( [ ' %s ' ] * len ( spot_ids ) )
query = f " SELECT * FROM scenicSpot WHERE id IN ( { placeholders } ) AND isDelete = 0 "
cursor . execute ( query , spot_ids )
results = cursor . fetchall ( )
cursor . close ( )
conn . close ( )
logger . info ( f " 批量查询景区信息: 请求 { len ( spot_ids ) } 个,找到 { len ( results ) } 个 " )
return results
except Exception as e :
logger . error ( f " 批量查询景区信息失败: { e } " )
return [ ]
2025-07-15 10:59:36 +08:00
def get_products_by_ids ( self , productIds : List [ int ] ) - > List [ Dict [ str , Any ] ] :
2025-07-14 17:46:20 +08:00
"""
根据ID列表批量获取产品信息
Args :
2025-07-15 10:59:36 +08:00
productIds : 产品ID列表
2025-07-14 17:46:20 +08:00
Returns :
产品信息列表
"""
2025-07-15 10:59:36 +08:00
if not self . db_pool or not productIds :
2025-07-14 17:46:20 +08:00
return [ ]
try :
conn = self . db_pool . get_connection ( )
cursor = conn . cursor ( dictionary = True )
# 构建IN查询
2025-07-15 10:59:36 +08:00
placeholders = ' , ' . join ( [ ' %s ' ] * len ( productIds ) )
2025-07-14 17:46:20 +08:00
query = f " SELECT * FROM product WHERE id IN ( { placeholders } ) AND isDelete = 0 "
2025-07-15 10:59:36 +08:00
cursor . execute ( query , productIds )
2025-07-14 17:46:20 +08:00
results = cursor . fetchall ( )
cursor . close ( )
conn . close ( )
2025-07-15 10:59:36 +08:00
logger . info ( f " 批量查询产品信息: 请求 { len ( productIds ) } 个,找到 { len ( results ) } 个 " )
2025-07-14 17:46:20 +08:00
return results
except Exception as e :
logger . error ( f " 批量查询产品信息失败: { e } " )
return [ ]
2025-07-15 10:59:36 +08:00
def get_styles_by_ids ( self , styleIds : List [ int ] ) - > List [ Dict [ str , Any ] ] :
2025-07-14 17:46:20 +08:00
"""
根据ID列表批量获取风格信息
Args :
2025-07-15 10:59:36 +08:00
styleIds : 风格ID列表
2025-07-14 17:46:20 +08:00
Returns :
风格信息列表
"""
2025-07-15 10:59:36 +08:00
if not self . db_pool or not styleIds :
2025-07-14 17:46:20 +08:00
return [ ]
try :
conn = self . db_pool . get_connection ( )
cursor = conn . cursor ( dictionary = True )
# 构建IN查询
2025-07-15 10:59:36 +08:00
placeholders = ' , ' . join ( [ ' %s ' ] * len ( styleIds ) )
2025-07-14 17:46:20 +08:00
query = f " SELECT * FROM contentStyle WHERE id IN ( { placeholders } ) AND isDelete = 0 "
2025-07-15 10:59:36 +08:00
cursor . execute ( query , styleIds )
2025-07-14 17:46:20 +08:00
results = cursor . fetchall ( )
cursor . close ( )
conn . close ( )
2025-07-15 10:59:36 +08:00
logger . info ( f " 批量查询风格信息: 请求 { len ( styleIds ) } 个,找到 { len ( results ) } 个 " )
2025-07-14 17:46:20 +08:00
return results
except Exception as e :
logger . error ( f " 批量查询风格信息失败: { e } " )
return [ ]
2025-07-15 10:59:36 +08:00
def get_audiences_by_ids ( self , audienceIds : List [ int ] ) - > List [ Dict [ str , Any ] ] :
2025-07-14 17:46:20 +08:00
"""
根据ID列表批量获取受众信息
Args :
2025-07-15 10:59:36 +08:00
audienceIds : 受众ID列表
2025-07-14 17:46:20 +08:00
Returns :
受众信息列表
"""
2025-07-15 10:59:36 +08:00
if not self . db_pool or not audienceIds :
2025-07-14 17:46:20 +08:00
return [ ]
try :
conn = self . db_pool . get_connection ( )
cursor = conn . cursor ( dictionary = True )
# 构建IN查询
2025-07-15 10:59:36 +08:00
placeholders = ' , ' . join ( [ ' %s ' ] * len ( audienceIds ) )
2025-07-14 17:46:20 +08:00
query = f " SELECT * FROM targetAudience WHERE id IN ( { placeholders } ) AND isDelete = 0 "
2025-07-15 10:59:36 +08:00
cursor . execute ( query , audienceIds )
2025-07-14 17:46:20 +08:00
results = cursor . fetchall ( )
cursor . close ( )
conn . close ( )
2025-07-15 10:59:36 +08:00
logger . info ( f " 批量查询受众信息: 请求 { len ( audienceIds ) } 个,找到 { len ( results ) } 个 " )
2025-07-14 17:46:20 +08:00
return results
except Exception as e :
logger . error ( f " 批量查询受众信息失败: { e } " )
return [ ]
def list_all_scenic_spots ( self , user_id : Optional [ int ] = None , is_public : Optional [ bool ] = None ) - > List [ Dict [ str , Any ] ] :
"""
获取所有景区列表
Args :
user_id : 用户ID , 如果提供则只返回该用户的景区
is_public : 是否公开 , 如果提供则过滤公开 / 私有景区
Returns :
景区列表
"""
if not self . db_pool :
return [ ]
try :
conn = self . db_pool . get_connection ( )
cursor = conn . cursor ( dictionary = True )
# 构建查询条件
conditions = [ " isDelete = 0 " ]
params = [ ]
if user_id is not None :
conditions . append ( " userId = %s " )
params . append ( user_id )
if is_public is not None :
conditions . append ( " isPublic = %s " )
params . append ( 1 if is_public else 0 )
query = f " SELECT id, name, address, description, advantage, highlight, isPublic, userId FROM scenicSpot WHERE { ' AND ' . join ( conditions ) } ORDER BY name "
cursor . execute ( query , params )
results = cursor . fetchall ( )
cursor . close ( )
conn . close ( )
logger . info ( f " 获取景区列表: 找到 { len ( results ) } 个景区 " )
return results
except Exception as e :
logger . error ( f " 获取景区列表失败: { e } " )
return [ ]
def list_all_products ( self , user_id : Optional [ int ] = None , is_public : Optional [ bool ] = None ) - > List [ Dict [ str , Any ] ] :
"""
获取所有产品列表
Args :
user_id : 用户ID , 如果提供则只返回该用户的产品
is_public : 是否公开 , 如果提供则过滤公开 / 私有产品
Returns :
产品列表
"""
if not self . db_pool :
return [ ]
try :
conn = self . db_pool . get_connection ( )
cursor = conn . cursor ( dictionary = True )
# 构建查询条件
conditions = [ " isDelete = 0 " ]
params = [ ]
if user_id is not None :
conditions . append ( " userId = %s " )
params . append ( user_id )
if is_public is not None :
conditions . append ( " isPublic = %s " )
params . append ( 1 if is_public else 0 )
query = f " SELECT id, name, originPrice, realPrice, packageInfo, description, advantage, highlight, isPublic, userId FROM product WHERE { ' AND ' . join ( conditions ) } ORDER BY name "
cursor . execute ( query , params )
results = cursor . fetchall ( )
cursor . close ( )
conn . close ( )
logger . info ( f " 获取产品列表: 找到 { len ( results ) } 个产品 " )
return results
except Exception as e :
logger . error ( f " 获取产品列表失败: { e } " )
return [ ]
def list_all_styles ( self ) - > List [ Dict [ str , Any ] ] :
"""
获取所有风格列表
Returns :
风格列表
"""
if not self . db_pool :
return [ ]
try :
conn = self . db_pool . get_connection ( )
cursor = conn . cursor ( dictionary = True )
cursor . execute ( " SELECT id, styleName, description FROM contentStyle WHERE isDelete = 0 ORDER BY styleName " )
results = cursor . fetchall ( )
cursor . close ( )
conn . close ( )
logger . info ( f " 获取风格列表: 找到 { len ( results ) } 个风格 " )
return results
except Exception as e :
logger . error ( f " 获取风格列表失败: { e } " )
return [ ]
def list_all_audiences ( self ) - > List [ Dict [ str , Any ] ] :
"""
获取所有受众列表
Returns :
受众列表
"""
if not self . db_pool :
return [ ]
try :
conn = self . db_pool . get_connection ( )
cursor = conn . cursor ( dictionary = True )
cursor . execute ( " SELECT id, audienceName, description FROM targetAudience WHERE isDelete = 0 ORDER BY audienceName " )
results = cursor . fetchall ( )
cursor . close ( )
conn . close ( )
logger . info ( f " 获取受众列表: 找到 { len ( results ) } 个受众 " )
return results
except Exception as e :
logger . error ( f " 获取受众列表失败: { e } " )
return [ ]
def is_available ( self ) - > bool :
"""
检查数据库服务是否可用
Returns :
数据库是否可用
"""
return self . db_pool is not None