309 lines
8.9 KiB
Python
309 lines
8.9 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
"""
|
|
Data Management Router
|
|
数据查询路由 - API v2
|
|
对应原系统的数据库查询功能
|
|
"""
|
|
|
|
import logging
|
|
from typing import Dict, Any, List, Optional
|
|
from fastapi import APIRouter, Depends, HTTPException, Query
|
|
from fastapi.responses import JSONResponse
|
|
|
|
from ..models import ApiResponse
|
|
from ..services import DatabaseService
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter()
|
|
|
|
# 全局数据库服务实例
|
|
_db_service = None
|
|
|
|
|
|
def get_database_service() -> DatabaseService:
|
|
"""获取数据库服务实例(单例模式)"""
|
|
global _db_service
|
|
if _db_service is None:
|
|
_db_service = DatabaseService()
|
|
return _db_service
|
|
|
|
|
|
@router.get("/scenic-spots", summary="获取景区列表")
|
|
async def get_scenic_spots(
|
|
limit: int = Query(10, ge=1, le=100, description="返回数量限制"),
|
|
offset: int = Query(0, ge=0, description="偏移量"),
|
|
search: Optional[str] = Query(None, description="搜索关键词"),
|
|
db_service: DatabaseService = Depends(get_database_service)
|
|
):
|
|
"""获取景区列表"""
|
|
try:
|
|
spots, total = db_service.get_scenic_spots(limit, offset, search)
|
|
|
|
return ApiResponse(
|
|
success=True,
|
|
message="获取景区列表成功",
|
|
data={
|
|
"scenic_spots": spots,
|
|
"total": total,
|
|
"limit": limit,
|
|
"offset": offset,
|
|
"has_more": offset + limit < total
|
|
}
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"获取景区列表失败: {e}")
|
|
return JSONResponse(
|
|
status_code=500,
|
|
content=ApiResponse(
|
|
success=False,
|
|
message="获取景区列表失败",
|
|
error=str(e)
|
|
).dict()
|
|
)
|
|
|
|
|
|
@router.get("/scenic-spots/{spot_id}", summary="获取景区详情")
|
|
async def get_scenic_spot(
|
|
spot_id: int,
|
|
db_service: DatabaseService = Depends(get_database_service)
|
|
):
|
|
"""根据ID获取景区详情"""
|
|
try:
|
|
spot = db_service.get_scenic_spot_by_id(spot_id)
|
|
|
|
if not spot:
|
|
raise HTTPException(status_code=404, detail="景区不存在")
|
|
|
|
return ApiResponse(
|
|
success=True,
|
|
message="获取景区详情成功",
|
|
data=spot
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"获取景区详情失败: {e}")
|
|
return JSONResponse(
|
|
status_code=500,
|
|
content=ApiResponse(
|
|
success=False,
|
|
message="获取景区详情失败",
|
|
error=str(e)
|
|
).dict()
|
|
)
|
|
|
|
|
|
@router.get("/products", summary="获取产品列表")
|
|
async def get_products(
|
|
limit: int = Query(10, ge=1, le=100, description="返回数量限制"),
|
|
offset: int = Query(0, ge=0, description="偏移量"),
|
|
search: Optional[str] = Query(None, description="搜索关键词"),
|
|
db_service: DatabaseService = Depends(get_database_service)
|
|
):
|
|
"""获取产品列表"""
|
|
try:
|
|
products, total = db_service.get_products(limit, offset, search)
|
|
|
|
return ApiResponse(
|
|
success=True,
|
|
message="获取产品列表成功",
|
|
data={
|
|
"products": products,
|
|
"total": total,
|
|
"limit": limit,
|
|
"offset": offset,
|
|
"has_more": offset + limit < total
|
|
}
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"获取产品列表失败: {e}")
|
|
return JSONResponse(
|
|
status_code=500,
|
|
content=ApiResponse(
|
|
success=False,
|
|
message="获取产品列表失败",
|
|
error=str(e)
|
|
).dict()
|
|
)
|
|
|
|
|
|
@router.get("/products/{product_id}", summary="获取产品详情")
|
|
async def get_product(
|
|
product_id: int,
|
|
db_service: DatabaseService = Depends(get_database_service)
|
|
):
|
|
"""根据ID获取产品详情"""
|
|
try:
|
|
product = db_service.get_product_by_id(product_id)
|
|
|
|
if not product:
|
|
raise HTTPException(status_code=404, detail="产品不存在")
|
|
|
|
return ApiResponse(
|
|
success=True,
|
|
message="获取产品详情成功",
|
|
data=product
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"获取产品详情失败: {e}")
|
|
return JSONResponse(
|
|
status_code=500,
|
|
content=ApiResponse(
|
|
success=False,
|
|
message="获取产品详情失败",
|
|
error=str(e)
|
|
).dict()
|
|
)
|
|
|
|
|
|
@router.get("/styles", summary="获取风格列表")
|
|
async def get_styles(
|
|
db_service: DatabaseService = Depends(get_database_service)
|
|
):
|
|
"""获取风格列表"""
|
|
try:
|
|
styles = db_service.get_styles()
|
|
|
|
return ApiResponse(
|
|
success=True,
|
|
message="获取风格列表成功",
|
|
data={"styles": styles, "total": len(styles)}
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"获取风格列表失败: {e}")
|
|
return JSONResponse(
|
|
status_code=500,
|
|
content=ApiResponse(
|
|
success=False,
|
|
message="获取风格列表失败",
|
|
error=str(e)
|
|
).dict()
|
|
)
|
|
|
|
|
|
@router.get("/audiences", summary="获取受众列表")
|
|
async def get_audiences(
|
|
db_service: DatabaseService = Depends(get_database_service)
|
|
):
|
|
"""获取受众列表"""
|
|
try:
|
|
audiences = db_service.get_audiences()
|
|
|
|
return ApiResponse(
|
|
success=True,
|
|
message="获取受众列表成功",
|
|
data={"audiences": audiences, "total": len(audiences)}
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"获取受众列表失败: {e}")
|
|
return JSONResponse(
|
|
status_code=500,
|
|
content=ApiResponse(
|
|
success=False,
|
|
message="获取受众列表失败",
|
|
error=str(e)
|
|
).dict()
|
|
)
|
|
|
|
|
|
@router.post("/scenic-spots/batch", summary="批量获取景区信息")
|
|
async def get_scenic_spots_batch(
|
|
spot_ids: List[int],
|
|
db_service: DatabaseService = Depends(get_database_service)
|
|
):
|
|
"""批量获取景区信息"""
|
|
try:
|
|
spots = db_service.get_scenic_spots_by_ids(spot_ids)
|
|
|
|
return ApiResponse(
|
|
success=True,
|
|
message=f"批量获取 {len(spots)} 个景区信息成功",
|
|
data={
|
|
"scenic_spots": spots,
|
|
"requested_count": len(spot_ids),
|
|
"found_count": len(spots)
|
|
}
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"批量获取景区信息失败: {e}")
|
|
return JSONResponse(
|
|
status_code=500,
|
|
content=ApiResponse(
|
|
success=False,
|
|
message="批量获取景区信息失败",
|
|
error=str(e)
|
|
).dict()
|
|
)
|
|
|
|
|
|
@router.post("/products/batch", summary="批量获取产品信息")
|
|
async def get_products_batch(
|
|
product_ids: List[int],
|
|
db_service: DatabaseService = Depends(get_database_service)
|
|
):
|
|
"""批量获取产品信息"""
|
|
try:
|
|
products = db_service.get_products_by_ids(product_ids)
|
|
|
|
return ApiResponse(
|
|
success=True,
|
|
message=f"批量获取 {len(products)} 个产品信息成功",
|
|
data={
|
|
"products": products,
|
|
"requested_count": len(product_ids),
|
|
"found_count": len(products)
|
|
}
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"批量获取产品信息失败: {e}")
|
|
return JSONResponse(
|
|
status_code=500,
|
|
content=ApiResponse(
|
|
success=False,
|
|
message="批量获取产品信息失败",
|
|
error=str(e)
|
|
).dict()
|
|
)
|
|
|
|
|
|
@router.get("/health", summary="数据库健康检查")
|
|
async def database_health_check(
|
|
db_service: DatabaseService = Depends(get_database_service)
|
|
):
|
|
"""检查数据库连接状态"""
|
|
try:
|
|
is_available = db_service.is_available()
|
|
|
|
return ApiResponse(
|
|
success=True,
|
|
message="数据库健康检查完成",
|
|
data={
|
|
"database_available": is_available,
|
|
"connection_type": "mysql" if is_available else "mock_data",
|
|
"status": "healthy" if is_available else "using_fallback"
|
|
}
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"数据库健康检查失败: {e}")
|
|
return JSONResponse(
|
|
status_code=500,
|
|
content=ApiResponse(
|
|
success=False,
|
|
message="数据库健康检查失败",
|
|
error=str(e)
|
|
).dict()
|
|
) |