#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 数据库访问器 提供按领域组织的数据库访问接口 """ import logging from typing import Dict, Any, Optional, List logger = logging.getLogger(__name__) class ScenicSpotAccessor: """景区数据访问器""" def __init__(self, db_service): self._db = db_service async def find_by_id(self, spot_id: int) -> Optional[Dict[str, Any]]: """根据 ID 查询景区""" try: # 调用现有的 DatabaseService return self._db.get_scenic_spot_by_id(spot_id) except Exception as e: logger.error(f"查询景区失败: {e}") return None async def find_by_name(self, name: str) -> Optional[Dict[str, Any]]: """根据名称查询景区""" try: return self._db.get_scenic_spot_by_name(name) except Exception as e: logger.error(f"查询景区失败: {e}") return None async def find_all(self, limit: int = 100) -> List[Dict[str, Any]]: """查询所有景区""" try: return self._db.get_all_scenic_spots(limit=limit) except Exception as e: logger.error(f"查询景区列表失败: {e}") return [] class ProductAccessor: """产品数据访问器""" def __init__(self, db_service): self._db = db_service async def find_by_id(self, product_id: int) -> Optional[Dict[str, Any]]: """根据 ID 查询产品""" try: return self._db.get_product_by_id(product_id) except Exception as e: logger.error(f"查询产品失败: {e}") return None async def find_by_scenic_spot(self, spot_id: int) -> List[Dict[str, Any]]: """根据景区查询产品""" try: return self._db.get_products_by_scenic_spot(spot_id) except Exception as e: logger.error(f"查询产品列表失败: {e}") return [] class StyleAccessor: """风格数据访问器""" def __init__(self, db_service): self._db = db_service async def find_by_id(self, style_id: int) -> Optional[Dict[str, Any]]: """根据 ID 查询风格""" try: return self._db.get_style_by_id(style_id) except Exception as e: logger.error(f"查询风格失败: {e}") return None async def find_all(self) -> List[Dict[str, Any]]: """查询所有风格""" try: return self._db.get_all_styles() except Exception as e: logger.error(f"查询风格列表失败: {e}") return [] class AudienceAccessor: """受众数据访问器""" def __init__(self, db_service): self._db = db_service async def find_by_id(self, audience_id: int) -> Optional[Dict[str, Any]]: """根据 ID 查询受众""" try: return self._db.get_audience_by_id(audience_id) except Exception as e: logger.error(f"查询受众失败: {e}") return None async def find_all(self) -> List[Dict[str, Any]]: """查询所有受众""" try: return self._db.get_all_audiences() except Exception as e: logger.error(f"查询受众列表失败: {e}") return [] class TemplateAccessor: """模板数据访问器""" def __init__(self, db_service): self._db = db_service async def find_by_id(self, template_id: str) -> Optional[Dict[str, Any]]: """根据 ID 查询模板""" try: return self._db.get_poster_template_by_id(template_id) except Exception as e: logger.error(f"查询模板失败: {e}") return None async def find_active(self) -> List[Dict[str, Any]]: """查询所有激活的模板""" try: return self._db.get_active_poster_templates() except Exception as e: logger.error(f"查询模板列表失败: {e}") return [] class ProductPackageAccessor: """ProductPackage 数据访问器 (ppid 解析)""" def __init__(self, db_service): self._db = db_service async def find_by_id(self, ppid: int) -> Optional[Dict[str, Any]]: """根据 ppid 查询 ProductPackage""" try: if hasattr(self._db, 'get_product_package_by_id'): return self._db.get_product_package_by_id(ppid) else: # 如果没有专门的方法,尝试通用查询 return self._db.execute_query( "SELECT * FROM product_package WHERE id = %s AND is_delete = 0", (ppid,) ) except Exception as e: logger.error(f"查询 ProductPackage 失败: {e}") return None async def resolve_ids(self, ppid: int) -> Optional[Dict[str, int]]: """ 解析 ppid 为原始 ID Args: ppid: ProductPackage ID Returns: {"scenic_spot_id": ..., "product_id": ..., "ppid": ...} """ try: package = await self.find_by_id(ppid) if package: return { "scenic_spot_id": package.get('scenic_spot_id') or package.get('scenicSpotId'), "product_id": package.get('product_id') or package.get('productId'), "ppid": ppid } return None except Exception as e: logger.error(f"解析 ppid 失败: {e}") return None class DatabaseAccessor: """ 数据库访问器 按领域组织数据访问: - db.scenic_spot.find_by_id(id) - db.product.find_by_id(id) - db.style.find_all() - db.audience.find_all() - db.template.find_active() """ def __init__(self, db_service=None): """ 初始化数据库访问器 Args: db_service: 底层 DatabaseService 实例 """ self._db_service = db_service self._scenic_spot: Optional[ScenicSpotAccessor] = None self._product: Optional[ProductAccessor] = None self._style: Optional[StyleAccessor] = None self._audience: Optional[AudienceAccessor] = None self._template: Optional[TemplateAccessor] = None self._product_package: Optional[ProductPackageAccessor] = None def set_db_service(self, db_service): """设置底层数据库服务""" self._db_service = db_service # 重置访问器 self._scenic_spot = None self._product = None self._style = None self._audience = None self._template = None self._product_package = None @property def scenic_spot(self) -> ScenicSpotAccessor: """景区数据访问器""" if self._scenic_spot is None: self._scenic_spot = ScenicSpotAccessor(self._db_service) return self._scenic_spot @property def product(self) -> ProductAccessor: """产品数据访问器""" if self._product is None: self._product = ProductAccessor(self._db_service) return self._product @property def style(self) -> StyleAccessor: """风格数据访问器""" if self._style is None: self._style = StyleAccessor(self._db_service) return self._style @property def audience(self) -> AudienceAccessor: """受众数据访问器""" if self._audience is None: self._audience = AudienceAccessor(self._db_service) return self._audience @property def template(self) -> TemplateAccessor: """模板数据访问器""" if self._template is None: self._template = TemplateAccessor(self._db_service) return self._template @property def product_package(self) -> ProductPackageAccessor: """ProductPackage 数据访问器 (ppid)""" if self._product_package is None: self._product_package = ProductPackageAccessor(self._db_service) return self._product_package async def resolve_ppid(self, ppid: int) -> Optional[Dict[str, int]]: """ 解析 ppid 为原始 ID (便捷方法) Args: ppid: ProductPackage ID Returns: {"scenic_spot_id": ..., "product_id": ..., "ppid": ...} """ return await self.product_package.resolve_ids(ppid)