""" 浏览器管理工具 基于Playwright的浏览器实例管理和反检测脚本注入。 """ import asyncio import os from pathlib import Path from typing import Optional, Dict, Any, List from playwright.async_api import Browser, BrowserContext, Page, Playwright, async_playwright from ..config.settings import settings from .exceptions import BrowserError, TimeoutError from .logger import get_logger logger = get_logger(__name__) class BrowserManager: """浏览器实例管理器""" def __init__(self): self.playwright: Optional[Playwright] = None self.browsers: Dict[str, Browser] = {} self.contexts: Dict[str, BrowserContext] = {} self.stealth_script: Optional[str] = None self._lock = asyncio.Lock() async def initialize(self): """初始化Playwright""" async with self._lock: if self.playwright is None: self.playwright = await async_playwright().start() await self._load_stealth_script() logger.info("浏览器管理器初始化完成") async def _load_stealth_script(self): """加载反检测脚本""" try: script_path = Path(__file__).parent.parent / "assets" / "stealth.min.js" if script_path.exists(): with open(script_path, 'r', encoding='utf-8') as f: self.stealth_script = f.read() logger.debug("反检测脚本加载成功") else: # 如果没有本地脚本,使用内置的基本脚本 self.stealth_script = self._get_basic_stealth_script() logger.debug("使用内置反检测脚本") except Exception as e: logger.warning(f"反检测脚本加载失败: {e}") self.stealth_script = self._get_basic_stealth_script() def _get_basic_stealth_script(self) -> str: """获取基本的反检测脚本""" return """ // 基本反检测脚本 (() => { // 移除webdriver属性 Object.defineProperty(navigator, 'webdriver', { get: () => undefined, }); // 修改plugins Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5], }); // 修改languages Object.defineProperty(navigator, 'languages', { get: () => ['zh-CN', 'zh', 'en'], }); // 添加chrome属性 window.chrome = { runtime: {}, }; // 重写permission query const originalQuery = window.navigator.permissions.query; window.navigator.permissions.query = (parameters) => ( parameters.name === 'notifications' ? Promise.resolve({ state: Notification.permission }) : originalQuery(parameters) ); })(); """ async def get_browser( self, headless: Optional[bool] = None, user_data_dir: Optional[str] = None, proxy: Optional[Dict[str, Any]] = None ) -> Browser: """获取或创建浏览器实例""" await self.initialize() headless = headless if headless is not None else settings.browser.headless proxy = proxy or ({"server": settings.browser.proxy} if settings.browser.proxy else None) # 生成浏览器标识 browser_id = f"headless_{headless}_{user_data_dir or 'default'}" async with self._lock: if browser_id not in self.browsers or self.browsers[browser_id].is_connected() is False: browser_args = self._get_browser_args() try: browser = await self.playwright.chromium.launch( headless=headless, args=browser_args, proxy=proxy, executable_path=settings.browser.executable_path ) self.browsers[browser_id] = browser logger.info(f"浏览器实例创建成功: {browser_id}") except Exception as e: raise BrowserError(f"浏览器启动失败: {e}") return self.browsers[browser_id] def _get_browser_args(self) -> List[str]: """获取浏览器启动参数""" args = [ '--no-sandbox', '--disable-setuid-sandbox', '--disable-infobars', '--window-position=0,0', '--ignore-certifcate-errors', '--ignore-certifcate-errors-spki-list', '--disable-blink-features=AutomationControlled', '--disable-features=VizDisplayCompositor', '--disable-extensions', '--disable-plugins', '--disable-images', # 可以根据需要开启 '--disable-javascript', # 不能关闭,需要JS执行 '--disable-default-apps', '--disable-background-timer-throttling', '--disable-backgrounding-occluded-windows', '--disable-renderer-backgrounding', '--disable-background-networking', '--disable-features=TranslateUI', '--disable-ipc-flooding-protection', '--enable-automation', '--password-store=basic', '--use-mock-keychain', '--no-first-run', '--no-default-browser-check', '--disable-fre', '--disable-features=VizDisplayCompositor' ] if not settings.browser.headless: args.extend([ '--start-maximized', '--disable-web-security', '--disable-features=VizDisplayCompositor' ]) return args async def get_context( self, browser: Optional[Browser] = None, user_data_dir: Optional[str] = None, viewport: Optional[Dict[str, int]] = None, user_agent: Optional[str] = None ) -> BrowserContext: """获取或创建浏览器上下文""" if browser is None: browser = await self.get_browser() context_id = f"{browser}_{user_data_dir or 'default'}" async with self._lock: if context_id not in self.contexts: context_options = { 'viewport': viewport or { 'width': settings.browser.viewport_width, 'height': settings.browser.viewport_height }, 'locale': settings.browser.locale, 'timezone_id': settings.browser.timezone, } if user_data_dir: context_options['user_data_dir'] = user_data_dir if user_agent or settings.browser.user_agent: context_options['user_agent'] = user_agent or settings.browser.user_agent try: context = await browser.new_context(**context_options) self.contexts[context_id] = context logger.debug(f"浏览器上下文创建成功: {context_id}") except Exception as e: raise BrowserError(f"浏览器上下文创建失败: {e}") return self.contexts[context_id] async def get_page( self, context: Optional[BrowserContext] = None, browser: Optional[Browser] = None ) -> Page: """创建新页面""" if context is None: context = await self.get_context(browser) try: page = await context.new_page() # 设置默认超时 page.set_default_timeout(settings.browser.timeout) # 注入反检测脚本 if self.stealth_script: await page.add_init_script(self.stealth_script) # 监听页面错误 page.on("pageerror", self._handle_page_error) page.on("requestfailed", self._handle_request_failed) logger.debug("新页面创建成功") return page except Exception as e: raise BrowserError(f"页面创建失败: {e}") async def _handle_page_error(self, error): """处理页面错误""" logger.warning(f"页面错误: {error}") async def _handle_request_failed(self, request): """处理请求失败""" url = request.url failure = request.failure logger.debug(f"请求失败: {url} - {failure}") async def inject_stealth_script(self, page: Page): """向页面注入反检测脚本""" if self.stealth_script: try: await page.evaluate(self.stealth_script) logger.debug("反检测脚本注入成功") except Exception as e: logger.warning(f"反检测脚本注入失败: {e}") async def take_screenshot(self, page: Page, file_path: str, full_page: bool = True): """截取页面截图""" try: await page.screenshot(path=file_path, full_page=full_page) logger.info(f"页面截图保存成功: {file_path}") except Exception as e: logger.error(f"页面截图失败: {e}") async def wait_for_network_idle(self, page: Page, timeout: float = 5000): """等待网络空闲""" try: await page.wait_for_load_state("networkidle", timeout=timeout) except Exception as e: raise TimeoutError(f"等待网络空闲超时: {timeout}ms", timeout=timeout) async def close_page(self, page: Page): """关闭页面""" try: await page.close() logger.debug("页面关闭成功") except Exception as e: logger.warning(f"页面关闭失败: {e}") async def close_context(self, context: BrowserContext): """关闭浏览器上下文""" try: # 从缓存中移除 context_id = None for cid, ctx in self.contexts.items(): if ctx == context: context_id = cid break if context_id: del self.contexts[context_id] await context.close() logger.debug("浏览器上下文关闭成功") except Exception as e: logger.warning(f"浏览器上下文关闭失败: {e}") async def close_browser(self, browser: Browser): """关闭浏览器""" try: # 从缓存中移除 browser_id = None for bid, br in self.browsers.items(): if br == browser: browser_id = bid break if browser_id: del self.browsers[browser_id] await browser.close() logger.debug("浏览器关闭成功") except Exception as e: logger.warning(f"浏览器关闭失败: {e}") async def cleanup(self): """清理所有资源""" async with self._lock: # 关闭所有上下文 for context in list(self.contexts.values()): try: await context.close() except: pass self.contexts.clear() # 关闭所有浏览器 for browser in list(self.browsers.values()): try: await browser.close() except: pass self.browsers.clear() # 关闭Playwright if self.playwright: try: await self.playwright.stop() except: pass self.playwright = None logger.info("浏览器管理器资源清理完成") # 全局浏览器管理器实例 browser_manager = BrowserManager()