2025-11-12 00:28:07 +08:00

337 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
浏览器管理工具
基于Playwright的浏览器实例管理和反检测脚本注入。
"""
import asyncio
import os
from pathlib import Path
from typing import Optional, Dict, Any, List
from playwright.async_api import Browser, BrowserContext, Page, Playwright, async_playwright
from ..config.settings import settings
from .exceptions import BrowserError, TimeoutError
from .logger import get_logger
logger = get_logger(__name__)
class BrowserManager:
"""浏览器实例管理器"""
def __init__(self):
self.playwright: Optional[Playwright] = None
self.browsers: Dict[str, Browser] = {}
self.contexts: Dict[str, BrowserContext] = {}
self.stealth_script: Optional[str] = None
self._lock = asyncio.Lock()
async def initialize(self):
"""初始化Playwright"""
async with self._lock:
if self.playwright is None:
self.playwright = await async_playwright().start()
await self._load_stealth_script()
logger.info("浏览器管理器初始化完成")
async def _load_stealth_script(self):
"""加载反检测脚本"""
try:
script_path = Path(__file__).parent.parent / "assets" / "stealth.min.js"
if script_path.exists():
with open(script_path, 'r', encoding='utf-8') as f:
self.stealth_script = f.read()
logger.debug("反检测脚本加载成功")
else:
# 如果没有本地脚本,使用内置的基本脚本
self.stealth_script = self._get_basic_stealth_script()
logger.debug("使用内置反检测脚本")
except Exception as e:
logger.warning(f"反检测脚本加载失败: {e}")
self.stealth_script = self._get_basic_stealth_script()
def _get_basic_stealth_script(self) -> str:
"""获取基本的反检测脚本"""
return """
// 基本反检测脚本
(() => {
// 移除webdriver属性
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined,
});
// 修改plugins
Object.defineProperty(navigator, 'plugins', {
get: () => [1, 2, 3, 4, 5],
});
// 修改languages
Object.defineProperty(navigator, 'languages', {
get: () => ['zh-CN', 'zh', 'en'],
});
// 添加chrome属性
window.chrome = {
runtime: {},
};
// 重写permission query
const originalQuery = window.navigator.permissions.query;
window.navigator.permissions.query = (parameters) => (
parameters.name === 'notifications' ?
Promise.resolve({ state: Notification.permission }) :
originalQuery(parameters)
);
})();
"""
async def get_browser(
self,
headless: Optional[bool] = None,
user_data_dir: Optional[str] = None,
proxy: Optional[Dict[str, Any]] = None
) -> Browser:
"""获取或创建浏览器实例"""
await self.initialize()
headless = headless if headless is not None else settings.browser.headless
proxy = proxy or ({"server": settings.browser.proxy} if settings.browser.proxy else None)
# 生成浏览器标识
browser_id = f"headless_{headless}_{user_data_dir or 'default'}"
async with self._lock:
if browser_id not in self.browsers or self.browsers[browser_id].is_connected() is False:
browser_args = self._get_browser_args()
try:
browser = await self.playwright.chromium.launch(
headless=headless,
args=browser_args,
proxy=proxy,
executable_path=settings.browser.executable_path
)
self.browsers[browser_id] = browser
logger.info(f"浏览器实例创建成功: {browser_id}")
except Exception as e:
raise BrowserError(f"浏览器启动失败: {e}")
return self.browsers[browser_id]
def _get_browser_args(self) -> List[str]:
"""获取浏览器启动参数"""
args = [
'--no-sandbox',
'--disable-setuid-sandbox',
'--disable-infobars',
'--window-position=0,0',
'--ignore-certifcate-errors',
'--ignore-certifcate-errors-spki-list',
'--disable-blink-features=AutomationControlled',
'--disable-features=VizDisplayCompositor',
'--disable-extensions',
'--disable-plugins',
'--disable-images', # 可以根据需要开启
'--disable-javascript', # 不能关闭需要JS执行
'--disable-default-apps',
'--disable-background-timer-throttling',
'--disable-backgrounding-occluded-windows',
'--disable-renderer-backgrounding',
'--disable-background-networking',
'--disable-features=TranslateUI',
'--disable-ipc-flooding-protection',
'--enable-automation',
'--password-store=basic',
'--use-mock-keychain',
'--no-first-run',
'--no-default-browser-check',
'--disable-fre',
'--disable-features=VizDisplayCompositor'
]
if not settings.browser.headless:
args.extend([
'--start-maximized',
'--disable-web-security',
'--disable-features=VizDisplayCompositor'
])
return args
async def get_context(
self,
browser: Optional[Browser] = None,
user_data_dir: Optional[str] = None,
viewport: Optional[Dict[str, int]] = None,
user_agent: Optional[str] = None
) -> BrowserContext:
"""获取或创建浏览器上下文"""
if browser is None:
browser = await self.get_browser()
context_id = f"{browser}_{user_data_dir or 'default'}"
async with self._lock:
if context_id not in self.contexts:
context_options = {
'viewport': viewport or {
'width': settings.browser.viewport_width,
'height': settings.browser.viewport_height
},
'locale': settings.browser.locale,
'timezone_id': settings.browser.timezone,
}
if user_data_dir:
context_options['user_data_dir'] = user_data_dir
if user_agent or settings.browser.user_agent:
context_options['user_agent'] = user_agent or settings.browser.user_agent
try:
context = await browser.new_context(**context_options)
self.contexts[context_id] = context
logger.debug(f"浏览器上下文创建成功: {context_id}")
except Exception as e:
raise BrowserError(f"浏览器上下文创建失败: {e}")
return self.contexts[context_id]
async def get_page(
self,
context: Optional[BrowserContext] = None,
browser: Optional[Browser] = None
) -> Page:
"""创建新页面"""
if context is None:
context = await self.get_context(browser)
try:
page = await context.new_page()
# 设置默认超时
page.set_default_timeout(settings.browser.timeout)
# 注入反检测脚本
if self.stealth_script:
await page.add_init_script(self.stealth_script)
# 监听页面错误
page.on("pageerror", self._handle_page_error)
page.on("requestfailed", self._handle_request_failed)
logger.debug("新页面创建成功")
return page
except Exception as e:
raise BrowserError(f"页面创建失败: {e}")
async def _handle_page_error(self, error):
"""处理页面错误"""
logger.warning(f"页面错误: {error}")
async def _handle_request_failed(self, request):
"""处理请求失败"""
url = request.url
failure = request.failure
logger.debug(f"请求失败: {url} - {failure}")
async def inject_stealth_script(self, page: Page):
"""向页面注入反检测脚本"""
if self.stealth_script:
try:
await page.evaluate(self.stealth_script)
logger.debug("反检测脚本注入成功")
except Exception as e:
logger.warning(f"反检测脚本注入失败: {e}")
async def take_screenshot(self, page: Page, file_path: str, full_page: bool = True):
"""截取页面截图"""
try:
await page.screenshot(path=file_path, full_page=full_page)
logger.info(f"页面截图保存成功: {file_path}")
except Exception as e:
logger.error(f"页面截图失败: {e}")
async def wait_for_network_idle(self, page: Page, timeout: float = 5000):
"""等待网络空闲"""
try:
await page.wait_for_load_state("networkidle", timeout=timeout)
except Exception as e:
raise TimeoutError(f"等待网络空闲超时: {timeout}ms", timeout=timeout)
async def close_page(self, page: Page):
"""关闭页面"""
try:
await page.close()
logger.debug("页面关闭成功")
except Exception as e:
logger.warning(f"页面关闭失败: {e}")
async def close_context(self, context: BrowserContext):
"""关闭浏览器上下文"""
try:
# 从缓存中移除
context_id = None
for cid, ctx in self.contexts.items():
if ctx == context:
context_id = cid
break
if context_id:
del self.contexts[context_id]
await context.close()
logger.debug("浏览器上下文关闭成功")
except Exception as e:
logger.warning(f"浏览器上下文关闭失败: {e}")
async def close_browser(self, browser: Browser):
"""关闭浏览器"""
try:
# 从缓存中移除
browser_id = None
for bid, br in self.browsers.items():
if br == browser:
browser_id = bid
break
if browser_id:
del self.browsers[browser_id]
await browser.close()
logger.debug("浏览器关闭成功")
except Exception as e:
logger.warning(f"浏览器关闭失败: {e}")
async def cleanup(self):
"""清理所有资源"""
async with self._lock:
# 关闭所有上下文
for context in list(self.contexts.values()):
try:
await context.close()
except:
pass
self.contexts.clear()
# 关闭所有浏览器
for browser in list(self.browsers.values()):
try:
await browser.close()
except:
pass
self.browsers.clear()
# 关闭Playwright
if self.playwright:
try:
await self.playwright.stop()
except:
pass
self.playwright = None
logger.info("浏览器管理器资源清理完成")
# 全局浏览器管理器实例
browser_manager = BrowserManager()