#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 热点数据 API 路由 """ import logging from typing import List, Optional from fastapi import APIRouter, HTTPException, Query from pydantic import BaseModel from domain.hotspot import get_hotspot_manager, HotTopic, HotTopicSource logger = logging.getLogger(__name__) router = APIRouter(prefix="/hotspot", tags=["热点数据"]) # ==================== 响应模型 ==================== class HotTopicResponse(BaseModel): """热点话题响应""" title: str source: str rank: Optional[int] = None heat: Optional[int] = None category: str url: Optional[str] = None description: Optional[str] = None tags: List[str] = [] extra: Optional[dict] = None # 额外数据 (如 tab 来源) @classmethod def from_model(cls, topic: HotTopic) -> 'HotTopicResponse': return cls( title=topic.title, source=topic.source.value, rank=topic.rank, heat=topic.heat, category=topic.category.value, url=topic.url, description=topic.description, tags=topic.tags, extra=topic.extra, ) class HotTopicListResponse(BaseModel): """热点列表响应""" success: bool = True source: str count: int topics: List[HotTopicResponse] class AllHotTopicsResponse(BaseModel): """所有热点响应""" success: bool = True sources: dict # source -> list of topics class AddTopicRequest(BaseModel): """添加自定义热点请求""" title: str description: Optional[str] = None tags: List[str] = [] category: str = "other" # ==================== API 路由 ==================== @router.get("/all", response_model=AllHotTopicsResponse) async def get_all_hotspots(force: bool = Query(False, description="强制刷新缓存")): """ 获取所有来源的热点数据 """ try: manager = get_hotspot_manager() results = await manager.fetch_all(force=force) # 转换为响应格式 sources = {} for source, topics in results.items(): sources[source] = [HotTopicResponse.from_model(t).dict() for t in topics] return AllHotTopicsResponse(sources=sources) except Exception as e: logger.error(f"获取热点失败: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/weibo", response_model=HotTopicListResponse) async def get_weibo_hotspots( limit: int = Query(50, ge=1, le=100), force: bool = Query(False) ): """获取微博热搜""" try: manager = get_hotspot_manager() topics = await manager.fetch_source(HotTopicSource.WEIBO, force=force) return HotTopicListResponse( source="weibo", count=len(topics[:limit]), topics=[HotTopicResponse.from_model(t) for t in topics[:limit]] ) except Exception as e: logger.error(f"获取微博热搜失败: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/baidu", response_model=HotTopicListResponse) async def get_baidu_hotspots( limit: int = Query(50, ge=1, le=100), force: bool = Query(False) ): """获取百度热搜""" try: manager = get_hotspot_manager() topics = await manager.fetch_source(HotTopicSource.BAIDU, force=force) return HotTopicListResponse( source="baidu", count=len(topics[:limit]), topics=[HotTopicResponse.from_model(t) for t in topics[:limit]] ) except Exception as e: logger.error(f"获取百度热搜失败: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/calendar", response_model=HotTopicListResponse) async def get_calendar_events( days: int = Query(30, ge=1, le=90, description="查询未来多少天的节日") ): """获取近期节日""" try: manager = get_hotspot_manager() topics = await manager.get_festivals(days=days) return HotTopicListResponse( source="calendar", count=len(topics), topics=[HotTopicResponse.from_model(t) for t in topics] ) except Exception as e: logger.error(f"获取节日数据失败: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/travel", response_model=HotTopicListResponse) async def get_travel_hotspots( limit: int = Query(10, ge=1, le=50) ): """获取旅游相关热点""" try: manager = get_hotspot_manager() topics = await manager.get_travel_topics(limit=limit) return HotTopicListResponse( source="travel", count=len(topics), topics=[HotTopicResponse.from_model(t) for t in topics] ) except Exception as e: logger.error(f"获取旅游热点失败: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/trending", response_model=HotTopicListResponse) async def get_trending( limit: int = Query(20, ge=1, le=100) ): """获取热门话题 (所有来源合并去重)""" try: manager = get_hotspot_manager() topics = await manager.get_trending(limit=limit) return HotTopicListResponse( source="trending", count=len(topics), topics=[HotTopicResponse.from_model(t) for t in topics] ) except Exception as e: logger.error(f"获取热门话题失败: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/xiaohongshu", response_model=HotTopicListResponse) async def get_xiaohongshu_hotspots( limit: int = Query(20, ge=1, le=50), force: bool = Query(False) ): """获取小红书热门话题 (文旅相关)""" try: manager = get_hotspot_manager() topics = await manager.fetch_source(HotTopicSource.XIAOHONGSHU, force=force) return HotTopicListResponse( source="xiaohongshu", count=len(topics[:limit]), topics=[HotTopicResponse.from_model(t) for t in topics[:limit]] ) except Exception as e: logger.error(f"获取小红书热门失败: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/bing", response_model=HotTopicListResponse) async def get_bing_hotspots( limit: int = Query(20, ge=1, le=50), force: bool = Query(False) ): """获取 Bing 搜索建议 (旅游相关)""" try: manager = get_hotspot_manager() topics = await manager.fetch_source(HotTopicSource.BING, force=force) return HotTopicListResponse( source="bing", count=len(topics[:limit]), topics=[HotTopicResponse.from_model(t) for t in topics[:limit]] ) except Exception as e: logger.error(f"获取 Bing 热搜失败: {e}") raise HTTPException(status_code=500, detail=str(e)) # ==================== 自定义热点管理 ==================== @router.get("/custom", response_model=HotTopicListResponse) async def get_custom_topics(): """获取自定义热点""" manager = get_hotspot_manager() topics = manager.get_custom_topics() return HotTopicListResponse( source="custom", count=len(topics), topics=[HotTopicResponse.from_model(t) for t in topics] ) @router.post("/custom") async def add_custom_topic(request: AddTopicRequest): """添加自定义热点""" from domain.hotspot.models import HotTopicCategory try: category = HotTopicCategory(request.category) except ValueError: category = HotTopicCategory.OTHER topic = HotTopic( title=request.title, source=HotTopicSource.CUSTOM, description=request.description, tags=request.tags, category=category, ) manager = get_hotspot_manager() manager.add_custom_topic(topic) return {"success": True, "message": f"已添加热点: {request.title}"} @router.delete("/custom/{title}") async def remove_custom_topic(title: str): """删除自定义热点""" manager = get_hotspot_manager() success = manager.remove_custom_topic(title) if success: return {"success": True, "message": f"已删除热点: {title}"} else: raise HTTPException(status_code=404, detail=f"热点不存在: {title}")