TravelContentCreator/api/services/database_service.py

969 lines
35 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
数据库服务层
负责景区产品风格受众等数据的查询和管理
"""
import logging
import time
import traceback
from typing import Dict, Any, Optional, List, Tuple
import mysql.connector
from mysql.connector import pooling
from core.config import ConfigManager
logger = logging.getLogger(__name__)
class DatabaseService:
"""数据库服务类"""
def __init__(self, config_manager: ConfigManager):
"""
初始化数据库服务
Args:
config_manager: 配置管理器
"""
self.config_manager = config_manager
self.db_pool = self._init_db_pool()
def _init_db_pool(self):
"""初始化数据库连接池"""
# 获取数据库配置
raw_db_config = self.config_manager.get_raw_config('database')
# 处理环境变量
db_config = self._process_env_vars(raw_db_config)
# 连接尝试配置
connection_attempts = [
{"desc": "使用配置文件中的设置", "config": db_config},
{"desc": "使用明确的密码", "config": {**db_config, "password": "password"}},
{"desc": "使用空密码", "config": {**db_config, "password": ""}},
{"desc": "使用auth_plugin", "config": {**db_config, "auth_plugin": "mysql_native_password"}}
]
# 尝试不同的连接方式
for attempt in connection_attempts:
try:
# 打印连接信息(不包含密码)
connection_info = {k: v for k, v in attempt["config"].items() if k != 'password'}
logger.info(f"尝试连接数据库 ({attempt['desc']}): {connection_info}")
# 创建连接池
pool = pooling.MySQLConnectionPool(
pool_name=f"database_service_pool_{int(time.time())}",
pool_size=10,
**attempt["config"]
)
# 测试连接
with pool.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT 1")
cursor.fetchall()
logger.info(f"数据库连接池初始化成功 ({attempt['desc']})")
return pool
except Exception as e:
error_details = traceback.format_exc()
logger.error(f"数据库连接尝试 ({attempt['desc']}) 失败: {e}\n{error_details}")
logger.error("所有数据库连接尝试都失败了")
return None
def _process_env_vars(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""处理环境变量"""
import os
processed_config = {}
for key, value in config.items():
if isinstance(value, str) and value.startswith("${") and value.endswith("}"):
env_var = value[2:-1]
processed_config[key] = os.environ.get(env_var, "")
else:
processed_config[key] = value
return processed_config
def get_scenic_spot_by_id(self, spot_id: int) -> Optional[Dict[str, Any]]:
2025-07-25 10:59:54 +08:00
"""根据ID获取单个景区信息"""
if not self.db_pool:
logger.error("数据库连接池未初始化")
return None
try:
2025-07-25 10:59:54 +08:00
with self.db_pool.get_connection() as conn:
with conn.cursor(dictionary=True) as cursor:
cursor.execute(
"SELECT * FROM scenicSpot WHERE id = %s AND isDelete = 0",
(spot_id,)
)
result = cursor.fetchone()
if result:
logger.info(f"找到景区信息: ID={spot_id}, 名称={result['name']}")
return result
else:
logger.warning(f"未找到景区信息: ID={spot_id}")
return None
except Exception as e:
logger.error(f"查询景区信息失败: {e}")
return None
def get_product_by_id(self, product_id: int) -> Optional[Dict[str, Any]]:
2025-07-25 10:59:54 +08:00
"""根据ID获取单个产品信息"""
if not self.db_pool:
return None
try:
2025-07-25 10:59:54 +08:00
with self.db_pool.get_connection() as conn:
with conn.cursor(dictionary=True) as cursor:
cursor.execute("SELECT * FROM product WHERE id = %s AND isDelete = 0", (product_id,))
result = cursor.fetchone()
if result:
logger.info(f"找到产品信息: ID={product_id}, 名称={result['productName']}")
return result
else:
logger.warning(f"未找到产品信息: ID={product_id}")
return None
except Exception as e:
logger.error(f"查询产品信息失败: {e}")
return None
def get_style_by_id(self, style_id: int) -> Optional[Dict[str, Any]]:
"""
根据ID获取风格信息
Args:
style_id: 风格ID
Returns:
风格信息字典如果未找到则返回None
"""
if not self.db_pool:
logger.error("数据库连接池未初始化")
return None
try:
2025-07-25 10:59:54 +08:00
with self.db_pool.get_connection() as conn:
with conn.cursor(dictionary=True) as cursor:
cursor.execute(
"SELECT * FROM contentStyle WHERE id = %s AND isDelete = 0",
(style_id,)
)
2025-07-25 10:59:54 +08:00
result = cursor.fetchone()
if result:
logger.info(f"找到风格信息: ID={style_id}, 名称={result['styleName']}")
return result
else:
logger.warning(f"未找到风格信息: ID={style_id}")
return None
except Exception as e:
logger.error(f"查询风格信息失败: {e}")
return None
def get_audience_by_id(self, audience_id: int) -> Optional[Dict[str, Any]]:
"""
根据ID获取受众信息
Args:
audience_id: 受众ID
Returns:
受众信息字典如果未找到则返回None
"""
if not self.db_pool:
logger.error("数据库连接池未初始化")
return None
try:
2025-07-25 10:59:54 +08:00
with self.db_pool.get_connection() as conn:
with conn.cursor(dictionary=True) as cursor:
cursor.execute(
"SELECT * FROM targetAudience WHERE id = %s AND isDelete = 0",
(audience_id,)
)
result = cursor.fetchone()
if result:
logger.info(f"找到受众信息: ID={audience_id}, 名称={result['audienceName']}")
return result
else:
logger.warning(f"未找到受众信息: ID={audience_id}")
return None
except Exception as e:
logger.error(f"查询受众信息失败: {e}")
return None
def get_scenic_spots_by_ids(self, spot_ids: List[int]) -> List[Dict[str, Any]]:
"""
根据ID列表批量获取景区信息
Args:
spot_ids: 景区ID列表
Returns:
景区信息列表
"""
if not self.db_pool or not spot_ids:
return []
try:
2025-07-25 10:59:54 +08:00
with self.db_pool.get_connection() as conn:
with conn.cursor(dictionary=True) as cursor:
# 构建IN查询
placeholders = ','.join(['%s'] * len(spot_ids))
query = f"SELECT * FROM scenicSpot WHERE id IN ({placeholders}) AND isDelete = 0"
cursor.execute(query, spot_ids)
results = cursor.fetchall()
logger.info(f"批量查询景区信息: 请求{len(spot_ids)}个,找到{len(results)}")
return results
except Exception as e:
logger.error(f"批量查询景区信息失败: {e}")
return []
2025-07-15 10:59:36 +08:00
def get_products_by_ids(self, productIds: List[int]) -> List[Dict[str, Any]]:
"""
根据ID列表批量获取产品信息
Args:
2025-07-15 10:59:36 +08:00
productIds: 产品ID列表
Returns:
产品信息列表
"""
2025-07-15 10:59:36 +08:00
if not self.db_pool or not productIds:
return []
try:
2025-07-25 10:59:54 +08:00
with self.db_pool.get_connection() as conn:
with conn.cursor(dictionary=True) as cursor:
# 构建IN查询
placeholders = ','.join(['%s'] * len(productIds))
query = f"SELECT * FROM product WHERE id IN ({placeholders}) AND isDelete = 0"
cursor.execute(query, productIds)
results = cursor.fetchall()
logger.info(f"批量查询产品信息: 请求{len(productIds)}个,找到{len(results)}")
return results
except Exception as e:
logger.error(f"批量查询产品信息失败: {e}")
return []
2025-07-15 10:59:36 +08:00
def get_styles_by_ids(self, styleIds: List[int]) -> List[Dict[str, Any]]:
"""
根据ID列表批量获取风格信息
Args:
2025-07-15 10:59:36 +08:00
styleIds: 风格ID列表
Returns:
风格信息列表
"""
2025-07-15 10:59:36 +08:00
if not self.db_pool or not styleIds:
return []
try:
2025-07-25 10:59:54 +08:00
with self.db_pool.get_connection() as conn:
with conn.cursor(dictionary=True) as cursor:
# 构建IN查询
placeholders = ','.join(['%s'] * len(styleIds))
query = f"SELECT * FROM contentStyle WHERE id IN ({placeholders}) AND isDelete = 0"
cursor.execute(query, styleIds)
results = cursor.fetchall()
logger.info(f"批量查询风格信息: 请求{len(styleIds)}个,找到{len(results)}")
return results
except Exception as e:
logger.error(f"批量查询风格信息失败: {e}")
return []
2025-07-15 10:59:36 +08:00
def get_audiences_by_ids(self, audienceIds: List[int]) -> List[Dict[str, Any]]:
"""
根据ID列表批量获取受众信息
Args:
2025-07-15 10:59:36 +08:00
audienceIds: 受众ID列表
Returns:
受众信息列表
"""
2025-07-15 10:59:36 +08:00
if not self.db_pool or not audienceIds:
return []
try:
2025-07-25 10:59:54 +08:00
with self.db_pool.get_connection() as conn:
with conn.cursor(dictionary=True) as cursor:
# 构建IN查询
placeholders = ','.join(['%s'] * len(audienceIds))
query = f"SELECT * FROM targetAudience WHERE id IN ({placeholders}) AND isDelete = 0"
cursor.execute(query, audienceIds)
results = cursor.fetchall()
logger.info(f"批量查询受众信息: 请求{len(audienceIds)}个,找到{len(results)}")
return results
except Exception as e:
logger.error(f"批量查询受众信息失败: {e}")
return []
def list_all_scenic_spots(self, user_id: Optional[int] = None, is_public: Optional[bool] = None) -> List[Dict[str, Any]]:
"""
获取所有景区列表
Args:
user_id: 用户ID如果提供则只返回该用户的景区
is_public: 是否公开如果提供则过滤公开/私有景区
Returns:
景区列表
"""
if not self.db_pool:
return []
try:
2025-07-25 10:59:54 +08:00
with self.db_pool.get_connection() as conn:
with conn.cursor(dictionary=True) as cursor:
# 构建查询条件
conditions = ["isDelete = 0"]
params = []
if user_id is not None:
conditions.append("userId = %s")
params.append(user_id)
if is_public is not None:
conditions.append("isPublic = %s")
params.append(1 if is_public else 0)
query = f"""
SELECT id, name, address, trafficInfo, description,
advantage, highlight, isPublic, userId
FROM scenicSpot
WHERE {' AND '.join(conditions)}
ORDER BY name
"""
cursor.execute(query, params)
results = cursor.fetchall()
logger.info(f"获取景区列表: 找到{len(results)}个景区")
return results
except Exception as e:
logger.error(f"获取景区列表失败: {e}")
return []
def list_all_products(self, user_id: Optional[int] = None, is_public: Optional[bool] = None) -> List[Dict[str, Any]]:
"""
获取所有产品列表
Args:
user_id: 用户ID如果提供则只返回该用户的产品
is_public: 是否公开如果提供则过滤公开/私有产品
Returns:
产品列表
"""
if not self.db_pool:
return []
try:
2025-07-25 10:59:54 +08:00
with self.db_pool.get_connection() as conn:
with conn.cursor(dictionary=True) as cursor:
# 构建查询条件
conditions = ["isDelete = 0"]
params = []
if user_id is not None:
conditions.append("userId = %s")
params.append(user_id)
if is_public is not None:
conditions.append("isPublic = %s")
params.append(1 if is_public else 0)
query = f"""
SELECT id, productName, originPrice, realPrice, packageInfo,
detailedDescription, keyAdvantages, highlights,
usageRules, surcharge, reservation, refund, discounts,
isPublic, userId
FROM product
WHERE {' AND '.join(conditions)}
ORDER BY productName
"""
cursor.execute(query, params)
results = cursor.fetchall()
logger.info(f"获取产品列表: 找到{len(results)}个产品")
return results
except Exception as e:
logger.error(f"获取产品列表失败: {e}")
return []
def list_all_styles(self) -> List[Dict[str, Any]]:
"""
获取所有风格列表
Returns:
风格列表
"""
if not self.db_pool:
return []
try:
2025-07-25 10:59:54 +08:00
with self.db_pool.get_connection() as conn:
with conn.cursor(dictionary=True) as cursor:
cursor.execute("SELECT id, styleName, description FROM contentStyle WHERE isDelete = 0 ORDER BY styleName")
results = cursor.fetchall()
logger.info(f"获取风格列表: 找到{len(results)}个风格")
return results
except Exception as e:
logger.error(f"获取风格列表失败: {e}")
return []
def list_all_audiences(self) -> List[Dict[str, Any]]:
"""
获取所有受众列表
Returns:
受众列表
"""
if not self.db_pool:
return []
try:
2025-07-25 10:59:54 +08:00
with self.db_pool.get_connection() as conn:
with conn.cursor(dictionary=True) as cursor:
cursor.execute("SELECT id, audienceName, description FROM targetAudience WHERE isDelete = 0 ORDER BY audienceName")
results = cursor.fetchall()
logger.info(f"获取受众列表: 找到{len(results)}个受众")
return results
except Exception as e:
logger.error(f"获取受众列表失败: {e}")
return []
def is_available(self) -> bool:
"""
检查数据库服务是否可用
Returns:
数据库是否可用
"""
2025-07-16 16:38:38 +08:00
return self.db_pool is not None
# 名称到ID的反向查询方法
def get_style_id_by_name(self, style_name: str) -> Optional[int]:
"""
根据风格名称获取风格ID
Args:
style_name: 风格名称
Returns:
风格ID如果未找到则返回None
"""
if not self.db_pool:
logger.error("数据库连接池未初始化")
return None
try:
2025-07-25 10:59:54 +08:00
with self.db_pool.get_connection() as conn:
with conn.cursor(dictionary=True) as cursor:
cursor.execute(
"SELECT id FROM contentStyle WHERE styleName = %s AND isDelete = 0",
2025-07-25 10:59:54 +08:00
(style_name,)
)
result = cursor.fetchone()
if result:
return result['id']
else:
logger.warning(f"未找到风格: {style_name}")
return None
2025-07-16 16:38:38 +08:00
except Exception as e:
logger.error(f"查询风格ID失败: {e}")
return None
def get_audience_id_by_name(self, audience_name: str) -> Optional[int]:
"""
根据受众名称获取受众ID
Args:
audience_name: 受众名称
Returns:
受众ID如果未找到则返回None
"""
if not self.db_pool:
logger.error("数据库连接池未初始化")
return None
try:
2025-07-25 10:59:54 +08:00
with self.db_pool.get_connection() as conn:
with conn.cursor(dictionary=True) as cursor:
cursor.execute(
"SELECT id FROM targetAudience WHERE audienceName = %s AND isDelete = 0",
(audience_name,)
)
result = cursor.fetchone()
if result:
return result['id']
else:
logger.warning(f"未找到受众: {audience_name}")
return None
2025-07-16 16:38:38 +08:00
except Exception as e:
logger.error(f"查询受众ID失败: {e}")
return None
def get_scenic_spot_id_by_name(self, spot_name: str) -> Optional[int]:
"""
根据景区名称获取景区ID
Args:
spot_name: 景区名称
Returns:
景区ID如果未找到则返回None
"""
if not self.db_pool:
logger.error("数据库连接池未初始化")
return None
try:
2025-07-25 10:59:54 +08:00
with self.db_pool.get_connection() as conn:
with conn.cursor(dictionary=True) as cursor:
cursor.execute(
"SELECT id FROM scenicSpot WHERE name = %s AND isDelete = 0",
(spot_name,)
)
result = cursor.fetchone()
if result:
return result['id']
else:
logger.warning(f"未找到景区: {spot_name}")
return None
2025-07-16 16:38:38 +08:00
except Exception as e:
logger.error(f"查询景区ID失败: {e}")
return None
def get_product_id_by_name(self, product_name: str) -> Optional[int]:
"""
根据产品名称获取产品ID
Args:
product_name: 产品名称
Returns:
产品ID如果未找到则返回None
"""
if not self.db_pool:
logger.error("数据库连接池未初始化")
return None
try:
2025-07-25 10:59:54 +08:00
with self.db_pool.get_connection() as conn:
with conn.cursor(dictionary=True) as cursor:
cursor.execute(
"SELECT id FROM product WHERE productName = %s AND isDelete = 0",
(product_name,)
)
result = cursor.fetchone()
if result:
return result['id']
else:
logger.warning(f"未找到产品: {product_name}")
return None
2025-07-16 16:38:38 +08:00
except Exception as e:
logger.error(f"查询产品ID失败: {e}")
2025-07-17 16:15:02 +08:00
return None
def get_image_by_id(self, image_id: int) -> Optional[Dict[str, Any]]:
"""
根据ID获取图像信息
Args:
image_id: 图像ID
Returns:
图像信息字典如果未找到则返回None
"""
if not self.db_pool:
logger.error("数据库连接池未初始化")
return None
try:
2025-07-25 10:59:54 +08:00
with self.db_pool.get_connection() as conn:
with conn.cursor(dictionary=True) as cursor:
cursor.execute(
"SELECT * FROM material WHERE id = %s AND materialType = 'image' AND isDelete = 0",
(image_id,)
)
result = cursor.fetchone()
if result:
logger.info(f"找到图像信息: ID={image_id}, 名称={result['materialName']}")
return result
else:
logger.warning(f"未找到图像信息: ID={image_id}")
return None
2025-07-17 16:15:02 +08:00
except Exception as e:
logger.error(f"查询图像信息失败: {e}")
return None
def get_images_by_ids(self, image_ids: List[int]) -> List[Dict[str, Any]]:
"""
根据ID列表批量获取图像信息
Args:
image_ids: 图像ID列表
Returns:
图像信息列表
"""
if not self.db_pool or not image_ids:
return []
try:
2025-07-25 10:59:54 +08:00
with self.db_pool.get_connection() as conn:
with conn.cursor(dictionary=True) as cursor:
# 构建IN查询
placeholders = ','.join(['%s'] * len(image_ids))
query = f"SELECT * FROM material WHERE id IN ({placeholders}) AND materialType = 'image' AND isDelete = 0"
cursor.execute(query, image_ids)
results = cursor.fetchall()
logger.info(f"批量查询图像信息: 请求{len(image_ids)}个,找到{len(results)}")
return results
2025-07-17 16:15:02 +08:00
except Exception as e:
logger.error(f"批量查询图像信息失败: {e}")
return []
def get_content_by_id(self, content_id: int) -> Optional[Dict[str, Any]]:
"""
根据ID获取内容信息
Args:
content_id: 内容ID
Returns:
内容信息字典如果未找到则返回None
"""
if not self.db_pool:
logger.error("数据库连接池未初始化")
return None
try:
2025-07-25 10:59:54 +08:00
with self.db_pool.get_connection() as conn:
with conn.cursor(dictionary=True) as cursor:
cursor.execute(
"SELECT * FROM content WHERE id = %s AND isDelete = 0",
(content_id,)
)
result = cursor.fetchone()
if result:
logger.info(f"找到内容信息: ID={content_id}, 标题={result['title']}")
return result
else:
logger.warning(f"未找到内容信息: ID={content_id}")
return None
2025-07-17 16:15:02 +08:00
except Exception as e:
logger.error(f"查询内容信息失败: {e}")
return None
def get_content_by_topic_index(self, topic_index: str) -> Optional[Dict[str, Any]]:
2025-07-25 10:59:54 +08:00
"""根据主题索引获取内容信息"""
2025-07-17 16:15:02 +08:00
if not self.db_pool:
return None
try:
2025-07-25 10:59:54 +08:00
with self.db_pool.get_connection() as conn:
with conn.cursor(dictionary=True) as cursor:
cursor.execute(
"SELECT * FROM content WHERE topicIndex = %s AND isDelete = 0 ORDER BY createTime DESC LIMIT 1",
(topic_index,)
)
result = cursor.fetchone()
if result:
logger.info(f"找到内容信息: topicIndex={topic_index}, 标题={result['title']}")
return result
else:
logger.warning(f"未找到内容信息: topicIndex={topic_index}")
return None
2025-07-17 16:15:02 +08:00
except Exception as e:
logger.error(f"查询内容信息失败: {e}")
return None
def get_images_by_folder_id(self, folder_id: int) -> List[Dict[str, Any]]:
"""
根据文件夹ID获取图像列表
Args:
folder_id: 文件夹ID
Returns:
图像信息列表
"""
if not self.db_pool:
return []
try:
2025-07-25 10:59:54 +08:00
with self.db_pool.get_connection() as conn:
with conn.cursor(dictionary=True) as cursor:
cursor.execute(
"SELECT * FROM material WHERE folderId = %s AND materialType = 'image' AND isDelete = 0 ORDER BY createTime DESC",
(folder_id,)
)
results = cursor.fetchall()
logger.info(f"根据文件夹ID获取图像: folderId={folder_id}, 找到{len(results)}个图像")
return results
2025-07-17 16:15:02 +08:00
except Exception as e:
logger.error(f"根据文件夹ID获取图像失败: {e}")
return []
def get_folder_by_id(self, folder_id: int) -> Optional[Dict[str, Any]]:
"""
根据ID获取文件夹信息
Args:
folder_id: 文件夹ID
Returns:
文件夹信息字典如果未找到则返回None
"""
if not self.db_pool:
logger.error("数据库连接池未初始化")
return None
try:
2025-07-25 10:59:54 +08:00
with self.db_pool.get_connection() as conn:
with conn.cursor(dictionary=True) as cursor:
cursor.execute(
"SELECT * FROM material_folder WHERE id = %s AND isDelete = 0",
(folder_id,)
)
result = cursor.fetchone()
if result:
logger.info(f"找到文件夹信息: ID={folder_id}, 名称={result['folderName']}")
return result
else:
logger.warning(f"未找到文件夹信息: ID={folder_id}")
return None
2025-07-17 16:15:02 +08:00
except Exception as e:
logger.error(f"查询文件夹信息失败: {e}")
return None
def get_related_images_for_content(self, content_id: int, limit: int = 10) -> List[Dict[str, Any]]:
"""
获取与内容相关的图像列表
Args:
content_id: 内容ID
limit: 限制数量
Returns:
相关图像列表
"""
if not self.db_pool:
return []
try:
2025-07-25 10:59:54 +08:00
with self.db_pool.get_connection() as conn:
with conn.cursor(dictionary=True) as cursor:
# 获取内容信息
cursor.execute(
"SELECT * FROM content WHERE id = %s AND isDelete = 0",
(content_id,)
)
content = cursor.fetchone()
if not content:
return []
# 获取相关的图像(这里可以根据业务逻辑调整查询条件)
cursor.execute(
"SELECT * FROM material WHERE materialType = 'image' AND isDelete = 0 ORDER BY RAND() LIMIT %s",
(limit,)
)
results = cursor.fetchall()
logger.info(f"获取相关内容图像: contentId={content_id}, 找到{len(results)}个图像")
return results
2025-07-17 16:15:02 +08:00
except Exception as e:
logger.error(f"获取相关内容图像失败: {e}")
2025-07-18 19:32:55 +08:00
return []
# 模板相关查询方法
def get_all_poster_templates(self) -> List[Dict[str, Any]]:
"""
获取所有海报模板
Returns:
模板列表
"""
if not self.db_pool:
return []
try:
2025-07-25 10:59:54 +08:00
with self.db_pool.get_connection() as conn:
with conn.cursor(dictionary=True) as cursor:
cursor.execute(
"SELECT * FROM poster_templates ORDER BY created_at"
2025-07-25 10:59:54 +08:00
)
results = cursor.fetchall()
logger.info(f"获取海报模板列表: 找到{len(results)}个模板")
return results
2025-07-18 19:32:55 +08:00
except Exception as e:
logger.error(f"获取海报模板列表失败: {e}")
return []
def get_poster_template_by_id(self, template_id: str) -> Optional[Dict[str, Any]]:
"""
根据ID获取海报模板信息
Args:
template_id: 模板ID
Returns:
模板信息字典如果未找到则返回None
"""
if not self.db_pool:
logger.error("数据库连接池未初始化")
return None
try:
2025-07-25 10:59:54 +08:00
with self.db_pool.get_connection() as conn:
with conn.cursor(dictionary=True) as cursor:
cursor.execute(
"SELECT * FROM poster_templates WHERE id = %s",
2025-07-25 10:59:54 +08:00
(template_id,)
)
result = cursor.fetchone()
if result:
logger.info(f"找到模板信息: ID={template_id}, 名称={result['name']}")
return result
else:
logger.warning(f"未找到模板信息: ID={template_id}")
return None
2025-07-18 19:32:55 +08:00
except Exception as e:
logger.error(f"查询模板信息失败: {e}")
return None
def get_active_poster_templates(self) -> List[Dict[str, Any]]:
"""
获取所有激活的海报模板
Returns:
激活的模板列表
"""
if not self.db_pool:
return []
try:
2025-07-25 10:59:54 +08:00
with self.db_pool.get_connection() as conn:
with conn.cursor(dictionary=True) as cursor:
cursor.execute(
"SELECT * FROM poster_templates WHERE is_active = 1 ORDER BY created_at"
2025-07-25 10:59:54 +08:00
)
results = cursor.fetchall()
logger.info(f"获取激活模板列表: 找到{len(results)}个模板")
return results
2025-07-18 19:32:55 +08:00
except Exception as e:
logger.error(f"获取激活模板列表失败: {e}")
return []
def update_template_usage_stats(self, template_id: str, success: bool = True, processing_time: float = 0.0):
"""
更新模板使用统计
Args:
template_id: 模板ID
success: 是否成功
processing_time: 处理时间
"""
if not self.db_pool:
logger.error("数据库连接池未初始化")
return
try:
conn = self.db_pool.get_connection()
cursor = conn.cursor()
# 更新或插入统计记录
cursor.execute("""
INSERT INTO templateUsageStats (templateId, usageCount, successCount, errorCount, avgProcessingTime, lastUsedAt)
VALUES (%s, 1, %s, %s, %s, NOW())
ON DUPLICATE KEY UPDATE
usageCount = usageCount + 1,
successCount = successCount + %s,
errorCount = errorCount + %s,
avgProcessingTime = (avgProcessingTime * (usageCount - 1) + %s) / usageCount,
lastUsedAt = NOW(),
2025-07-25 17:13:37 +08:00
updatedAt = NOW()
2025-07-18 19:32:55 +08:00
""", (
template_id,
1 if success else 0,
0 if success else 1,
processing_time,
1 if success else 0,
0 if success else 1,
processing_time
))
conn.commit()
cursor.close()
conn.close()
logger.info(f"更新模板使用统计: template_id={template_id}, success={success}, time={processing_time:.3f}s")
except Exception as e:
logger.error(f"更新模板使用统计失败: {e}")
def get_template_usage_stats(self, template_id: str) -> Optional[Dict[str, Any]]:
"""
获取模板使用统计
Args:
template_id: 模板ID
Returns:
使用统计信息
"""
if not self.db_pool:
return None
try:
2025-07-25 10:59:54 +08:00
with self.db_pool.get_connection() as conn:
with conn.cursor(dictionary=True) as cursor:
cursor.execute(
"SELECT * FROM templateUsageStats WHERE templateId = %s",
(template_id,)
)
result = cursor.fetchone()
if result:
logger.info(f"找到模板使用统计: template_id={template_id}")
return result
else:
logger.warning(f"未找到模板使用统计: template_id={template_id}")
return None
2025-07-18 19:32:55 +08:00
except Exception as e:
logger.error(f"查询模板使用统计失败: {e}")
return None