TravelContentCreator/api/services/database_service.py

1055 lines
38 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
数据库服务层 - 改进版本
负责景区、产品、风格、受众等数据的查询和管理
"""
import logging
import time
import traceback
import os
import json
from typing import Dict, Any, Optional, List, Tuple
import mysql.connector
from mysql.connector import pooling
logger = logging.getLogger(__name__)
class DatabaseService:
"""数据库服务类 - 改进版本"""
def __init__(self, config_manager=None):
"""
初始化数据库服务
Args:
config_manager: 配置管理器(可选)
"""
self.config_manager = config_manager
self.db_pool = self._init_db_pool()
# 添加连接状态检查
if self.db_pool:
logger.info("数据库服务初始化成功")
else:
logger.error("数据库服务初始化失败")
def _load_database_config(self) -> Dict[str, Any]:
"""
加载数据库配置 - 改进的配置加载逻辑
Returns:
数据库配置字典
"""
db_config = {}
# 方法1尝试从配置管理器获取
if self.config_manager:
try:
raw_config = self.config_manager.get_raw_config('database')
if raw_config:
db_config = self._process_env_vars(raw_config)
logger.info("从配置管理器加载数据库配置成功")
else:
logger.warning("配置管理器中未找到数据库配置")
except Exception as e:
logger.warning(f"从配置管理器加载数据库配置失败: {e}")
# 方法2如果配置管理器失败尝试直接读取配置文件
if not db_config:
try:
config_path = os.path.join(os.path.dirname(__file__), '../../config/database.json')
if os.path.exists(config_path):
with open(config_path, 'r', encoding='utf-8') as f:
raw_config = json.load(f)
db_config = self._process_env_vars(raw_config)
logger.info("直接从配置文件加载数据库配置成功")
else:
logger.warning(f"配置文件不存在: {config_path}")
except Exception as e:
logger.warning(f"直接读取配置文件失败: {e}")
# 方法3使用环境变量作为兜底
if not db_config:
db_config = {
'host': os.getenv('DB_HOST', 'localhost'),
'user': os.getenv('DB_USER', 'root'),
'password': os.getenv('DB_PASSWORD', 'civmek-rezTed-0hovre'),
'database': os.getenv('DB_NAME', 'travel_content'),
'port': int(os.getenv('DB_PORT', '3306')),
'charset': 'utf8mb4',
'pool_size': 10
}
logger.info("使用环境变量和默认值构建数据库配置")
return db_config
def _init_db_pool(self):
"""初始化数据库连接池 - 改进版本"""
try:
# 获取数据库配置
db_config = self._load_database_config()
if not db_config:
logger.error("无法获取数据库配置")
return None
# 打印连接信息(不包含密码)
connection_info = {k: v for k, v in db_config.items() if k != 'password'}
logger.info(f"尝试连接数据库: {connection_info}")
# 从配置中分离MySQL连接池支持的参数和不支持的参数
config = db_config.copy()
# MySQL连接池不支持的参数需要移除
unsupported_params = [
'max_retry_attempts', 'query_timeout', 'soft_delete_field',
'active_record_value', 'pool_size'
]
# 提取连接池大小
pool_size = config.pop('pool_size', 10)
# 移除不支持的参数
for param in unsupported_params:
config.pop(param, None)
# 确保必要的参数存在且正确
config.setdefault('autocommit', True)
config.setdefault('raise_on_warnings', True)
# 创建连接池
pool = pooling.MySQLConnectionPool(
pool_name=f"database_service_pool_{int(time.time())}",
pool_size=pool_size,
**config
)
# 测试连接
test_conn = pool.get_connection()
try:
cursor = test_conn.cursor()
cursor.execute("SELECT 1 as test")
result = cursor.fetchone()
cursor.close()
logger.info(f"数据库连接测试成功,测试结果: {result}")
finally:
test_conn.close()
logger.info(f"数据库连接池初始化成功,连接池大小: {pool_size}")
return pool
except Exception as e:
error_details = traceback.format_exc()
logger.error(f"数据库连接池初始化失败: {e}\n{error_details}")
return None
def _process_env_vars(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""处理环境变量 - 改进版本"""
if not config:
return {}
processed_config = {}
for key, value in config.items():
if isinstance(value, str) and value.startswith("${") and value.endswith("}"):
env_var = value[2:-1]
env_value = os.environ.get(env_var, "")
processed_config[key] = env_value
if env_value:
logger.debug(f"环境变量 {env_var} 被成功替换")
else:
logger.warning(f"环境变量 {env_var} 未设置")
else:
processed_config[key] = value
return processed_config
def _execute_query(self, query: str, params: Optional[Tuple] = None, fetch_one: bool = False) -> Optional[Any]:
"""
执行数据库查询的通用方法
Args:
query: SQL查询语句
params: 查询参数
fetch_one: 是否只获取一条记录
Returns:
查询结果
"""
if not self.db_pool:
logger.error("数据库连接池未初始化")
return None
try:
with self.db_pool.get_connection() as conn:
with conn.cursor(dictionary=True) as cursor:
cursor.execute(query, params or ())
if fetch_one:
result = cursor.fetchone()
else:
result = cursor.fetchall()
return result
except Exception as e:
logger.error(f"执行查询失败: {query[:100]}... 错误: {e}")
return None
def get_scenic_spot_by_id(self, spot_id: int) -> Optional[Dict[str, Any]]:
"""根据ID获取单个景区信息 - 改进版本"""
logger.debug(f"查询景区信息: ID={spot_id}")
query = "SELECT * FROM scenicSpot WHERE id = %s AND isDelete = 0"
result = self._execute_query(query, (spot_id,), fetch_one=True)
if result:
logger.info(f"成功找到景区信息: ID={spot_id}, 名称={result.get('name', 'Unknown')}")
return result
else:
logger.warning(f"未找到景区信息: ID={spot_id}")
return None
def get_product_by_id(self, product_id: int) -> Optional[Dict[str, Any]]:
"""根据ID获取单个产品信息 - 改进版本"""
logger.debug(f"查询产品信息: ID={product_id}")
query = "SELECT * FROM product WHERE id = %s AND isDelete = 0"
result = self._execute_query(query, (product_id,), fetch_one=True)
if result:
logger.info(f"成功找到产品信息: ID={product_id}, 名称={result.get('productName', 'Unknown')}")
return result
else:
logger.warning(f"未找到产品信息: ID={product_id}")
return None
def get_style_by_id(self, style_id: int) -> Optional[Dict[str, Any]]:
"""
根据ID获取风格信息 - 改进版本
Args:
style_id: 风格ID
Returns:
风格信息字典如果未找到则返回None
"""
logger.debug(f"查询风格信息: ID={style_id}")
query = "SELECT * FROM contentStyle WHERE id = %s AND isDelete = 0"
result = self._execute_query(query, (style_id,), fetch_one=True)
if result:
logger.info(f"成功找到风格信息: ID={style_id}, 名称={result.get('styleName', 'Unknown')}")
return result
else:
logger.warning(f"未找到风格信息: ID={style_id}")
return None
def get_audience_by_id(self, audience_id: int) -> Optional[Dict[str, Any]]:
"""
根据ID获取受众信息 - 改进版本
Args:
audience_id: 受众ID
Returns:
受众信息字典如果未找到则返回None
"""
logger.debug(f"查询受众信息: ID={audience_id}")
query = "SELECT * FROM targetAudience WHERE id = %s AND isDelete = 0"
result = self._execute_query(query, (audience_id,), fetch_one=True)
if result:
logger.info(f"成功找到受众信息: ID={audience_id}, 名称={result.get('audienceName', 'Unknown')}")
return result
else:
logger.warning(f"未找到受众信息: ID={audience_id}")
return None
def get_scenic_spots_by_ids(self, spot_ids: List[int]) -> List[Dict[str, Any]]:
"""
根据ID列表批量获取景区信息 - 改进版本
Args:
spot_ids: 景区ID列表
Returns:
景区信息列表
"""
if not spot_ids:
logger.warning("景区ID列表为空")
return []
logger.info(f"批量查询景区信息: IDs={spot_ids}") # 添加具体ID信息
placeholders = ','.join(['%s'] * len(spot_ids))
query = f"SELECT * FROM scenicSpot WHERE id IN ({placeholders}) AND isDelete = 0"
# 添加查询详情日志
logger.debug(f"执行景区查询SQL: {query} with params: {spot_ids}")
results = self._execute_query(query, tuple(spot_ids))
if results is None:
logger.error("批量查询景区信息失败")
return self._get_fallback_scenic_spots(spot_ids)
# 检查是否所有ID都找到了对应的记录
found_ids = {result['id'] for result in results}
missing_ids = set(spot_ids) - found_ids
if missing_ids:
logger.warning(f"部分景区ID未找到: {missing_ids}")
logger.info(f"数据库中实际找到的景区ID: {found_ids}")
# 添加兜底数据
fallback_spots = self._get_fallback_scenic_spots(list(missing_ids))
results.extend(fallback_spots)
logger.info(f"批量查询景区信息: 请求{len(spot_ids)}个,找到{len([r for r in results if r['id'] in spot_ids])}")
return results
def _get_fallback_scenic_spots(self, spot_ids: List[int]) -> List[Dict[str, Any]]:
"""
获取景区的兜底数据 - 改进版本
"""
fallback_spots = []
for spot_id in spot_ids:
fallback_spots.append({
'id': spot_id,
'name': f'景区{spot_id}',
'address': '默认地址',
'advantage': '优美的自然风光',
'highlight': '值得一游的景点',
'isPublic': 1,
'isDelete': 0,
'_is_fallback': True
})
logger.info(f"使用兜底景区数据: {len(fallback_spots)}")
return fallback_spots
def get_products_by_ids(self, productIds: List[int]) -> List[Dict[str, Any]]:
"""
根据ID列表批量获取产品信息 - 改进版本
Args:
productIds: 产品ID列表
Returns:
产品信息列表
"""
if not productIds:
logger.warning("产品ID列表为空")
return []
logger.debug(f"批量查询产品信息: IDs={productIds}")
placeholders = ','.join(['%s'] * len(productIds))
query = f"SELECT * FROM product WHERE id IN ({placeholders}) AND isDelete = 0"
results = self._execute_query(query, tuple(productIds))
if results is None:
logger.error("批量查询产品信息失败")
return self._get_fallback_products(productIds)
# 检查是否所有ID都找到了对应的记录
found_ids = {result['id'] for result in results}
missing_ids = set(productIds) - found_ids
if missing_ids:
logger.warning(f"部分产品ID未找到: {missing_ids}")
# 添加兜底数据
fallback_products = self._get_fallback_products(list(missing_ids))
results.extend(fallback_products)
logger.info(f"批量查询产品信息: 请求{len(productIds)}个,找到{len([r for r in results if r['id'] in productIds])}")
return results
def _get_fallback_products(self, productIds: List[int]) -> List[Dict[str, Any]]:
"""
获取产品的兜底数据 - 改进版本,使用正确的字段名
"""
fallback_products = []
for product_id in productIds:
fallback_products.append({
'id': product_id,
'productName': f'产品{product_id}',
'originPrice': '999',
'realPrice': '888',
'packageInfo': '包含景区门票和导游服务',
'keyAdvantages': '性价比高,服务优质',
'highlights': '精品线路,专业导游',
'detailedDescription': '详细的产品描述',
'isDelete': 0,
'_is_fallback': True
})
logger.info(f"使用兜底产品数据: {len(fallback_products)}")
return fallback_products
def get_styles_by_ids(self, styleIds: List[int]) -> List[Dict[str, Any]]:
"""
根据ID列表批量获取风格信息 - 改进版本
Args:
styleIds: 风格ID列表
Returns:
风格信息列表
"""
if not styleIds:
logger.warning("风格ID列表为空")
return []
logger.debug(f"批量查询风格信息: IDs={styleIds}")
placeholders = ','.join(['%s'] * len(styleIds))
query = f"SELECT * FROM contentStyle WHERE id IN ({placeholders}) AND isDelete = 0"
results = self._execute_query(query, tuple(styleIds))
if results is None:
logger.error("批量查询风格信息失败")
return self._get_fallback_styles(styleIds)
# 检查是否所有ID都找到了对应的记录
found_ids = {result['id'] for result in results}
missing_ids = set(styleIds) - found_ids
if missing_ids:
logger.warning(f"部分风格ID未找到: {missing_ids}")
# 添加兜底数据
fallback_styles = self._get_fallback_styles(list(missing_ids))
results.extend(fallback_styles)
logger.info(f"批量查询风格信息: 请求{len(styleIds)}个,找到{len([r for r in results if r['id'] in styleIds])}")
return results
def _get_fallback_styles(self, styleIds: List[int]) -> List[Dict[str, Any]]:
"""
获取风格的兜底数据 - 改进版本
"""
fallback_styles = []
for style_id in styleIds:
fallback_styles.append({
'id': style_id,
'styleName': f'风格{style_id}',
'description': '默认风格描述',
'isDelete': 0,
'_is_fallback': True
})
logger.info(f"使用兜底风格数据: {len(fallback_styles)}")
return fallback_styles
def get_audiences_by_ids(self, audienceIds: List[int]) -> List[Dict[str, Any]]:
"""
根据ID列表批量获取受众信息 - 改进版本
Args:
audienceIds: 受众ID列表
Returns:
受众信息列表
"""
if not audienceIds:
logger.warning("受众ID列表为空")
return []
logger.debug(f"批量查询受众信息: IDs={audienceIds}")
placeholders = ','.join(['%s'] * len(audienceIds))
query = f"SELECT * FROM targetAudience WHERE id IN ({placeholders}) AND isDelete = 0"
results = self._execute_query(query, tuple(audienceIds))
if results is None:
logger.error("批量查询受众信息失败")
return self._get_fallback_audiences(audienceIds)
# 检查是否所有ID都找到了对应的记录
found_ids = {result['id'] for result in results}
missing_ids = set(audienceIds) - found_ids
if missing_ids:
logger.warning(f"部分受众ID未找到: {missing_ids}")
# 添加兜底数据
fallback_audiences = self._get_fallback_audiences(list(missing_ids))
results.extend(fallback_audiences)
logger.info(f"批量查询受众信息: 请求{len(audienceIds)}个,找到{len([r for r in results if r['id'] in audienceIds])}")
return results
def _get_fallback_audiences(self, audienceIds: List[int]) -> List[Dict[str, Any]]:
"""
获取受众的兜底数据 - 改进版本
"""
fallback_audiences = []
for audience_id in audienceIds:
fallback_audiences.append({
'id': audience_id,
'audienceName': f'受众{audience_id}',
'description': '默认受众描述',
'isDelete': 0,
'_is_fallback': True
})
logger.info(f"使用兜底受众数据: {len(fallback_audiences)}")
return fallback_audiences
def list_all_scenic_spots(self, user_id: Optional[int] = None, is_public: Optional[bool] = None) -> List[Dict[str, Any]]:
"""
获取所有景区列表 - 改进版本
Args:
user_id: 用户ID如果提供则只返回该用户的景区
is_public: 是否公开,如果提供则过滤公开/私有景区
Returns:
景区列表
"""
logger.debug(f"获取景区列表: user_id={user_id}, is_public={is_public}")
# 构建查询条件
conditions = ["isDelete = 0"]
params = []
if user_id is not None:
conditions.append("userId = %s")
params.append(user_id)
if is_public is not None:
conditions.append("isPublic = %s")
params.append(1 if is_public else 0)
query = f"""
SELECT id, name, address, trafficInfo, description,
advantage, highlight, isPublic, userId
FROM scenicSpot
WHERE {' AND '.join(conditions)}
ORDER BY name
"""
results = self._execute_query(query, tuple(params))
if results is None:
logger.error("获取景区列表失败")
return []
logger.info(f"获取景区列表: 找到{len(results)}个景区")
return results
def list_all_products(self, user_id: Optional[int] = None, is_public: Optional[bool] = None) -> List[Dict[str, Any]]:
"""
获取所有产品列表 - 改进版本
Args:
user_id: 用户ID如果提供则只返回该用户的产品
is_public: 是否公开,如果提供则过滤公开/私有产品
Returns:
产品列表
"""
logger.debug(f"获取产品列表: user_id={user_id}, is_public={is_public}")
# 构建查询条件
conditions = ["isDelete = 0"]
params = []
if user_id is not None:
conditions.append("userId = %s")
params.append(user_id)
if is_public is not None:
conditions.append("isPublic = %s")
params.append(1 if is_public else 0)
query = f"""
SELECT id, productName, originPrice, realPrice, packageInfo,
detailedDescription, keyAdvantages, highlights,
usageRules, surcharge, reservation, refund, discounts,
isPublic, userId
FROM product
WHERE {' AND '.join(conditions)}
ORDER BY productName
"""
results = self._execute_query(query, tuple(params))
if results is None:
logger.error("获取产品列表失败")
return []
logger.info(f"获取产品列表: 找到{len(results)}个产品")
return results
def list_all_styles(self) -> List[Dict[str, Any]]:
"""
获取所有风格列表 - 改进版本
Returns:
风格列表
"""
logger.debug("获取风格列表")
query = "SELECT id, styleName, description FROM contentStyle WHERE isDelete = 0 ORDER BY styleName"
results = self._execute_query(query)
if results is None:
logger.error("获取风格列表失败")
return []
logger.info(f"获取风格列表: 找到{len(results)}个风格")
return results
def list_all_audiences(self) -> List[Dict[str, Any]]:
"""
获取所有受众列表 - 改进版本
Returns:
受众列表
"""
logger.debug("获取受众列表")
query = "SELECT id, audienceName, description FROM targetAudience WHERE isDelete = 0 ORDER BY audienceName"
results = self._execute_query(query)
if results is None:
logger.error("获取受众列表失败")
return []
logger.info(f"获取受众列表: 找到{len(results)}个受众")
return results
def is_available(self) -> bool:
"""
检查数据库服务是否可用 - 改进版本
Returns:
数据库是否可用
"""
if not self.db_pool:
return False
try:
# 执行简单的测试查询
result = self._execute_query("SELECT 1 as test", fetch_one=True)
return result is not None and result.get('test') == 1
except Exception as e:
logger.error(f"数据库可用性检查失败: {e}")
return False
# 名称到ID的反向查询方法 - 改进版本
def get_style_id_by_name(self, style_name: str) -> Optional[int]:
"""
根据风格名称获取风格ID - 改进版本
Args:
style_name: 风格名称
Returns:
风格ID如果未找到则返回None
"""
logger.debug(f"根据名称查询风格ID: {style_name}")
query = "SELECT id FROM contentStyle WHERE styleName = %s AND isDelete = 0"
result = self._execute_query(query, (style_name,), fetch_one=True)
if result:
style_id = result['id']
logger.info(f"找到风格ID: {style_name} -> {style_id}")
return style_id
else:
logger.warning(f"未找到风格: {style_name}")
return None
def get_audience_id_by_name(self, audience_name: str) -> Optional[int]:
"""
根据受众名称获取受众ID - 改进版本
Args:
audience_name: 受众名称
Returns:
受众ID如果未找到则返回None
"""
logger.debug(f"根据名称查询受众ID: {audience_name}")
query = "SELECT id FROM targetAudience WHERE audienceName = %s AND isDelete = 0"
result = self._execute_query(query, (audience_name,), fetch_one=True)
if result:
audience_id = result['id']
logger.info(f"找到受众ID: {audience_name} -> {audience_id}")
return audience_id
else:
logger.warning(f"未找到受众: {audience_name}")
return None
def get_scenic_spot_id_by_name(self, spot_name: str) -> Optional[int]:
"""
根据景区名称获取景区ID - 改进版本
Args:
spot_name: 景区名称
Returns:
景区ID如果未找到则返回None
"""
logger.debug(f"根据名称查询景区ID: {spot_name}")
query = "SELECT id FROM scenicSpot WHERE name = %s AND isDelete = 0"
result = self._execute_query(query, (spot_name,), fetch_one=True)
if result:
spot_id = result['id']
logger.info(f"找到景区ID: {spot_name} -> {spot_id}")
return spot_id
else:
logger.warning(f"未找到景区: {spot_name}")
return None
def get_product_id_by_name(self, product_name: str) -> Optional[int]:
"""
根据产品名称获取产品ID - 改进版本
Args:
product_name: 产品名称
Returns:
产品ID如果未找到则返回None
"""
logger.debug(f"根据名称查询产品ID: {product_name}")
query = "SELECT id FROM product WHERE productName = %s AND isDelete = 0"
result = self._execute_query(query, (product_name,), fetch_one=True)
if result:
product_id = result['id']
logger.info(f"找到产品ID: {product_name} -> {product_id}")
return product_id
else:
logger.warning(f"未找到产品: {product_name}")
return None
def get_image_by_id(self, image_id: int) -> Optional[Dict[str, Any]]:
"""
根据ID获取图像信息 - 改进版本
Args:
image_id: 图像ID
Returns:
图像信息字典如果未找到则返回None
"""
logger.debug(f"查询图像信息: ID={image_id}")
query = "SELECT * FROM material WHERE id = %s AND materialType = 'image' AND isDelete = 0"
result = self._execute_query(query, (image_id,), fetch_one=True)
if result:
logger.info(f"成功找到图像信息: ID={image_id}, 名称={result.get('materialName', 'Unknown')}")
return result
else:
logger.warning(f"未找到图像信息: ID={image_id}")
return None
def get_images_by_ids(self, image_ids: List[int]) -> List[Dict[str, Any]]:
"""
根据ID列表批量获取图像信息 - 改进版本
Args:
image_ids: 图像ID列表
Returns:
图像信息列表
"""
if not image_ids:
return []
logger.debug(f"批量查询图像信息: IDs={image_ids}")
placeholders = ','.join(['%s'] * len(image_ids))
query = f"SELECT * FROM material WHERE id IN ({placeholders}) AND materialType = 'image' AND isDelete = 0"
results = self._execute_query(query, tuple(image_ids))
if results is None:
logger.error("批量查询图像信息失败")
return []
logger.info(f"批量查询图像信息: 请求{len(image_ids)}个,找到{len(results)}")
return results
def get_content_by_id(self, content_id: int) -> Optional[Dict[str, Any]]:
"""
根据ID获取内容信息 - 改进版本
Args:
content_id: 内容ID
Returns:
内容信息字典如果未找到则返回None
"""
logger.debug(f"查询内容信息: ID={content_id}")
query = "SELECT * FROM content WHERE id = %s AND isDelete = 0"
result = self._execute_query(query, (content_id,), fetch_one=True)
if result:
logger.info(f"成功找到内容信息: ID={content_id}, 标题={result.get('title', 'Unknown')}")
return result
else:
logger.warning(f"未找到内容信息: ID={content_id}")
return None
def get_content_by_topic_index(self, topic_index: str) -> Optional[Dict[str, Any]]:
"""根据主题索引获取内容信息 - 改进版本"""
logger.debug(f"根据主题索引查询内容信息: topic_index={topic_index}")
query = "SELECT * FROM content WHERE topicIndex = %s AND isDelete = 0 ORDER BY createTime DESC LIMIT 1"
result = self._execute_query(query, (topic_index,), fetch_one=True)
if result:
logger.info(f"成功找到内容信息: topicIndex={topic_index}, 标题={result.get('title', 'Unknown')}")
return result
else:
logger.warning(f"未找到内容信息: topicIndex={topic_index}")
return None
def get_images_by_folder_id(self, folder_id: int) -> List[Dict[str, Any]]:
"""
根据文件夹ID获取图像列表 - 改进版本
Args:
folder_id: 文件夹ID
Returns:
图像信息列表
"""
logger.debug(f"根据文件夹ID获取图像: folder_id={folder_id}")
query = "SELECT * FROM material WHERE folderId = %s AND materialType = 'image' AND isDelete = 0 ORDER BY createTime DESC"
results = self._execute_query(query, (folder_id,))
if results is None:
logger.error(f"根据文件夹ID获取图像失败: folder_id={folder_id}")
return []
logger.info(f"根据文件夹ID获取图像: folderId={folder_id}, 找到{len(results)}个图像")
return results
def get_folder_by_id(self, folder_id: int) -> Optional[Dict[str, Any]]:
"""
根据ID获取文件夹信息 - 改进版本
Args:
folder_id: 文件夹ID
Returns:
文件夹信息字典如果未找到则返回None
"""
logger.debug(f"查询文件夹信息: ID={folder_id}")
query = "SELECT * FROM material_folder WHERE id = %s AND isDelete = 0"
result = self._execute_query(query, (folder_id,), fetch_one=True)
if result:
logger.info(f"成功找到文件夹信息: ID={folder_id}, 名称={result.get('folderName', 'Unknown')}")
return result
else:
logger.warning(f"未找到文件夹信息: ID={folder_id}")
return None
def get_related_images_for_content(self, content_id: int, limit: int = 10) -> List[Dict[str, Any]]:
"""
获取与内容相关的图像列表 - 改进版本
Args:
content_id: 内容ID
limit: 限制数量
Returns:
相关图像列表
"""
logger.debug(f"获取相关内容图像: content_id={content_id}, limit={limit}")
# 首先检查内容是否存在
content_query = "SELECT * FROM content WHERE id = %s AND isDelete = 0"
content = self._execute_query(content_query, (content_id,), fetch_one=True)
if not content:
logger.warning(f"内容不存在: content_id={content_id}")
return []
# 获取相关的图像(这里可以根据业务逻辑调整查询条件)
images_query = "SELECT * FROM material WHERE materialType = 'image' AND isDelete = 0 ORDER BY RAND() LIMIT %s"
results = self._execute_query(images_query, (limit,))
if results is None:
logger.error(f"获取相关内容图像失败: content_id={content_id}")
return []
logger.info(f"获取相关内容图像: contentId={content_id}, 找到{len(results)}个图像")
return results
# 模板相关查询方法 - 改进版本
def get_all_poster_templates(self) -> List[Dict[str, Any]]:
"""
获取所有海报模板 - 改进版本
Returns:
模板列表
"""
logger.debug("获取所有海报模板")
query = "SELECT * FROM poster_templates ORDER BY created_at"
results = self._execute_query(query)
if results is None:
logger.error("获取海报模板列表失败")
return []
logger.info(f"获取海报模板列表: 找到{len(results)}个模板")
return results
def get_poster_template_by_id(self, template_id: str) -> Optional[Dict[str, Any]]:
"""
根据ID获取海报模板信息 - 改进版本
Args:
template_id: 模板ID
Returns:
模板信息字典如果未找到则返回None
"""
logger.debug(f"查询模板信息: ID={template_id}")
query = "SELECT * FROM poster_templates WHERE id = %s"
result = self._execute_query(query, (template_id,), fetch_one=True)
if result:
logger.info(f"成功找到模板信息: ID={template_id}, 名称={result.get('name', 'Unknown')}")
return result
else:
logger.warning(f"未找到模板信息: ID={template_id}")
return None
def get_active_poster_templates(self) -> List[Dict[str, Any]]:
"""
获取所有激活的海报模板 - 改进版本
Returns:
激活的模板列表
"""
logger.debug("获取激活的海报模板")
query = "SELECT * FROM poster_templates WHERE is_active = 1 ORDER BY created_at"
results = self._execute_query(query)
if results is None:
logger.error("获取激活模板列表失败")
return []
logger.info(f"获取激活模板列表: 找到{len(results)}个模板")
return results
def update_template_usage_stats(self, template_id: str, success: bool = True, processing_time: float = 0.0):
"""
更新模板使用统计 - 改进版本
Args:
template_id: 模板ID
success: 是否成功
processing_time: 处理时间(秒)
"""
if not self.db_pool:
logger.error("数据库连接池未初始化")
return
try:
with self.db_pool.get_connection() as conn:
with conn.cursor() as cursor:
# 更新或插入统计记录
cursor.execute("""
INSERT INTO templateUsageStats (templateId, usageCount, successCount, errorCount, avgProcessingTime, lastUsedAt)
VALUES (%s, 1, %s, %s, %s, NOW())
ON DUPLICATE KEY UPDATE
usageCount = usageCount + 1,
successCount = successCount + %s,
errorCount = errorCount + %s,
avgProcessingTime = (avgProcessingTime * (usageCount - 1) + %s) / usageCount,
lastUsedAt = NOW(),
updatedAt = NOW()
""", (
template_id,
1 if success else 0,
0 if success else 1,
processing_time,
1 if success else 0,
0 if success else 1,
processing_time
))
conn.commit()
logger.info(f"更新模板使用统计成功: template_id={template_id}, success={success}, time={processing_time:.3f}s")
except Exception as e:
logger.error(f"更新模板使用统计失败: {e}")
def get_template_usage_stats(self, template_id: str) -> Optional[Dict[str, Any]]:
"""
获取模板使用统计 - 改进版本
Args:
template_id: 模板ID
Returns:
使用统计信息
"""
logger.debug(f"查询模板使用统计: template_id={template_id}")
query = "SELECT * FROM templateUsageStats WHERE templateId = %s"
result = self._execute_query(query, (template_id,), fetch_one=True)
if result:
logger.info(f"成功找到模板使用统计: template_id={template_id}")
return result
else:
logger.warning(f"未找到模板使用统计: template_id={template_id}")
return None
def get_connection_info(self) -> Dict[str, Any]:
"""
获取数据库连接信息 - 用于调试
Returns:
连接信息字典
"""
if not self.db_pool:
return {"status": "disconnected", "pool": None}
try:
# 获取连接池状态
pool_info = {
"status": "connected",
"pool_name": getattr(self.db_pool, '_pool_name', 'unknown'),
"pool_size": getattr(self.db_pool, '_pool_size', 'unknown'),
"active_connections": len(getattr(self.db_pool, '_cnx_queue', [])) if hasattr(self.db_pool, '_cnx_queue') else 'unknown'
}
# 测试连接
test_result = self._execute_query("SELECT VERSION() as version, DATABASE() as database", fetch_one=True)
if test_result:
pool_info.update({
"mysql_version": test_result.get('version', 'unknown'),
"current_database": test_result.get('database', 'unknown')
})
return pool_info
except Exception as e:
logger.error(f"获取连接信息失败: {e}")
return {"status": "error", "error": str(e)}