#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 百度热搜爬虫 数据源: https://top.baidu.com/board?tab=realtime """ import logging import re from typing import List from datetime import datetime, timedelta from .base import BaseCrawler from ..models import HotTopic, HotTopicSource, HotTopicCategory logger = logging.getLogger(__name__) class BaiduCrawler(BaseCrawler): """百度热搜爬虫""" source = HotTopicSource.BAIDU name = "百度热搜" # 百度热搜 API - 支持多个榜单 API_URLS = { 'realtime': 'https://top.baidu.com/api/board?platform=wise&tab=realtime', # 实时热点 'travel': 'https://top.baidu.com/api/board?platform=wise&tab=travel', # 旅游榜 ⭐ 'novel': 'https://top.baidu.com/api/board?platform=wise&tab=novel', # 小说榜 'movie': 'https://top.baidu.com/api/board?platform=wise&tab=movie', # 电影榜 'teleplay': 'https://top.baidu.com/api/board?platform=wise&tab=teleplay', # 电视剧榜 'car': 'https://top.baidu.com/api/board?platform=wise&tab=car', # 汽车榜 'game': 'https://top.baidu.com/api/board?platform=wise&tab=game', # 游戏榜 } def __init__(self, tabs: List[str] = None): super().__init__() # 默认获取实时热点 + 旅游榜 self.tabs = tabs or ['realtime', 'travel'] async def fetch(self) -> List[HotTopic]: """获取百度热搜 (支持多榜单)""" all_topics = [] try: session = await self._get_session() for tab in self.tabs: url = self.API_URLS.get(tab) if not url: continue topics = await self._fetch_tab(session, url, tab) all_topics.extend(topics) self.logger.info(f"获取到 {len(all_topics)} 条百度热搜") except Exception as e: self.logger.error(f"获取百度热搜失败: {e}") return all_topics async def _fetch_tab(self, session, url: str, tab: str) -> List[HotTopic]: """获取单个榜单""" topics = [] try: async with session.get(url) as response: if response.status != 200: self.logger.error(f"请求 {tab} 失败: {response.status}") return [] data = await response.json() # 解析数据 cards = data.get('data', {}).get('cards', []) if cards: content = cards[0].get('content', []) for idx, item in enumerate(content): topic = self._parse_item(item, idx + 1, tab) if topic: topics.append(topic) except Exception as e: self.logger.error(f"获取 {tab} 榜单失败: {e}") return topics def _parse_item(self, item: dict, rank: int, tab: str = 'realtime') -> HotTopic: """解析单条热搜""" try: word = item.get('word', '') or item.get('query', '') if not word: return None # 热度 (确保是整数) hot_score = item.get('hotScore', 0) if isinstance(hot_score, str): try: hot_score = int(hot_score) except ValueError: hot_score = 0 # 描述 desc = item.get('desc', '') # 判断分类 category = self._detect_category(word, desc) return HotTopic( title=word, source=self.source, rank=rank, heat=hot_score, category=category, url=item.get('url', f"https://www.baidu.com/s?wd={word}"), description=desc, fetched_at=datetime.now(), expires_at=datetime.now() + timedelta(hours=2), # 2小时后过期 extra={ 'img': item.get('img', ''), 'show': item.get('show', []), 'tab': tab, # 来源榜单 } ) except Exception as e: self.logger.warning(f"解析热搜失败: {e}") return None def _detect_category(self, title: str, desc: str) -> HotTopicCategory: """检测分类""" text = f"{title} {desc}" # 旅游相关 travel_keywords = ['旅游', '旅行', '景区', '景点', '酒店', '度假', '出游'] if any(kw in text for kw in travel_keywords): return HotTopicCategory.TRAVEL # 美食相关 food_keywords = ['美食', '餐厅', '小吃', '火锅'] if any(kw in text for kw in food_keywords): return HotTopicCategory.FOOD return HotTopicCategory.TRENDING