337 lines
12 KiB
Python
337 lines
12 KiB
Python
"""
|
||
浏览器管理工具
|
||
基于Playwright的浏览器实例管理和反检测脚本注入。
|
||
"""
|
||
|
||
import asyncio
|
||
import os
|
||
from pathlib import Path
|
||
from typing import Optional, Dict, Any, List
|
||
from playwright.async_api import Browser, BrowserContext, Page, Playwright, async_playwright
|
||
|
||
from ..config.settings import settings
|
||
from .exceptions import BrowserError, TimeoutError
|
||
from .logger import get_logger
|
||
|
||
logger = get_logger(__name__)
|
||
|
||
|
||
class BrowserManager:
|
||
"""浏览器实例管理器"""
|
||
|
||
def __init__(self):
|
||
self.playwright: Optional[Playwright] = None
|
||
self.browsers: Dict[str, Browser] = {}
|
||
self.contexts: Dict[str, BrowserContext] = {}
|
||
self.stealth_script: Optional[str] = None
|
||
self._lock = asyncio.Lock()
|
||
|
||
async def initialize(self):
|
||
"""初始化Playwright"""
|
||
async with self._lock:
|
||
if self.playwright is None:
|
||
self.playwright = await async_playwright().start()
|
||
await self._load_stealth_script()
|
||
logger.info("浏览器管理器初始化完成")
|
||
|
||
async def _load_stealth_script(self):
|
||
"""加载反检测脚本"""
|
||
try:
|
||
script_path = Path(__file__).parent.parent / "assets" / "stealth.min.js"
|
||
if script_path.exists():
|
||
with open(script_path, 'r', encoding='utf-8') as f:
|
||
self.stealth_script = f.read()
|
||
logger.debug("反检测脚本加载成功")
|
||
else:
|
||
# 如果没有本地脚本,使用内置的基本脚本
|
||
self.stealth_script = self._get_basic_stealth_script()
|
||
logger.debug("使用内置反检测脚本")
|
||
except Exception as e:
|
||
logger.warning(f"反检测脚本加载失败: {e}")
|
||
self.stealth_script = self._get_basic_stealth_script()
|
||
|
||
def _get_basic_stealth_script(self) -> str:
|
||
"""获取基本的反检测脚本"""
|
||
return """
|
||
// 基本反检测脚本
|
||
(() => {
|
||
// 移除webdriver属性
|
||
Object.defineProperty(navigator, 'webdriver', {
|
||
get: () => undefined,
|
||
});
|
||
|
||
// 修改plugins
|
||
Object.defineProperty(navigator, 'plugins', {
|
||
get: () => [1, 2, 3, 4, 5],
|
||
});
|
||
|
||
// 修改languages
|
||
Object.defineProperty(navigator, 'languages', {
|
||
get: () => ['zh-CN', 'zh', 'en'],
|
||
});
|
||
|
||
// 添加chrome属性
|
||
window.chrome = {
|
||
runtime: {},
|
||
};
|
||
|
||
// 重写permission query
|
||
const originalQuery = window.navigator.permissions.query;
|
||
window.navigator.permissions.query = (parameters) => (
|
||
parameters.name === 'notifications' ?
|
||
Promise.resolve({ state: Notification.permission }) :
|
||
originalQuery(parameters)
|
||
);
|
||
})();
|
||
"""
|
||
|
||
async def get_browser(
|
||
self,
|
||
headless: Optional[bool] = None,
|
||
user_data_dir: Optional[str] = None,
|
||
proxy: Optional[Dict[str, Any]] = None
|
||
) -> Browser:
|
||
"""获取或创建浏览器实例"""
|
||
await self.initialize()
|
||
|
||
headless = headless if headless is not None else settings.browser.headless
|
||
proxy = proxy or ({"server": settings.browser.proxy} if settings.browser.proxy else None)
|
||
|
||
# 生成浏览器标识
|
||
browser_id = f"headless_{headless}_{user_data_dir or 'default'}"
|
||
|
||
async with self._lock:
|
||
if browser_id not in self.browsers or self.browsers[browser_id].is_connected() is False:
|
||
browser_args = self._get_browser_args()
|
||
|
||
try:
|
||
browser = await self.playwright.chromium.launch(
|
||
headless=headless,
|
||
args=browser_args,
|
||
proxy=proxy,
|
||
executable_path=settings.browser.executable_path
|
||
)
|
||
self.browsers[browser_id] = browser
|
||
logger.info(f"浏览器实例创建成功: {browser_id}")
|
||
except Exception as e:
|
||
raise BrowserError(f"浏览器启动失败: {e}")
|
||
|
||
return self.browsers[browser_id]
|
||
|
||
def _get_browser_args(self) -> List[str]:
|
||
"""获取浏览器启动参数"""
|
||
args = [
|
||
'--no-sandbox',
|
||
'--disable-setuid-sandbox',
|
||
'--disable-infobars',
|
||
'--window-position=0,0',
|
||
'--ignore-certifcate-errors',
|
||
'--ignore-certifcate-errors-spki-list',
|
||
'--disable-blink-features=AutomationControlled',
|
||
'--disable-features=VizDisplayCompositor',
|
||
'--disable-extensions',
|
||
'--disable-plugins',
|
||
'--disable-images', # 可以根据需要开启
|
||
'--disable-javascript', # 不能关闭,需要JS执行
|
||
'--disable-default-apps',
|
||
'--disable-background-timer-throttling',
|
||
'--disable-backgrounding-occluded-windows',
|
||
'--disable-renderer-backgrounding',
|
||
'--disable-background-networking',
|
||
'--disable-features=TranslateUI',
|
||
'--disable-ipc-flooding-protection',
|
||
'--enable-automation',
|
||
'--password-store=basic',
|
||
'--use-mock-keychain',
|
||
'--no-first-run',
|
||
'--no-default-browser-check',
|
||
'--disable-fre',
|
||
'--disable-features=VizDisplayCompositor'
|
||
]
|
||
|
||
if not settings.browser.headless:
|
||
args.extend([
|
||
'--start-maximized',
|
||
'--disable-web-security',
|
||
'--disable-features=VizDisplayCompositor'
|
||
])
|
||
|
||
return args
|
||
|
||
async def get_context(
|
||
self,
|
||
browser: Optional[Browser] = None,
|
||
user_data_dir: Optional[str] = None,
|
||
viewport: Optional[Dict[str, int]] = None,
|
||
user_agent: Optional[str] = None
|
||
) -> BrowserContext:
|
||
"""获取或创建浏览器上下文"""
|
||
if browser is None:
|
||
browser = await self.get_browser()
|
||
|
||
context_id = f"{browser}_{user_data_dir or 'default'}"
|
||
|
||
async with self._lock:
|
||
if context_id not in self.contexts:
|
||
context_options = {
|
||
'viewport': viewport or {
|
||
'width': settings.browser.viewport_width,
|
||
'height': settings.browser.viewport_height
|
||
},
|
||
'locale': settings.browser.locale,
|
||
'timezone_id': settings.browser.timezone,
|
||
}
|
||
|
||
if user_data_dir:
|
||
context_options['user_data_dir'] = user_data_dir
|
||
|
||
if user_agent or settings.browser.user_agent:
|
||
context_options['user_agent'] = user_agent or settings.browser.user_agent
|
||
|
||
try:
|
||
context = await browser.new_context(**context_options)
|
||
self.contexts[context_id] = context
|
||
logger.debug(f"浏览器上下文创建成功: {context_id}")
|
||
except Exception as e:
|
||
raise BrowserError(f"浏览器上下文创建失败: {e}")
|
||
|
||
return self.contexts[context_id]
|
||
|
||
async def get_page(
|
||
self,
|
||
context: Optional[BrowserContext] = None,
|
||
browser: Optional[Browser] = None
|
||
) -> Page:
|
||
"""创建新页面"""
|
||
if context is None:
|
||
context = await self.get_context(browser)
|
||
|
||
try:
|
||
page = await context.new_page()
|
||
|
||
# 设置默认超时
|
||
page.set_default_timeout(settings.browser.timeout)
|
||
|
||
# 注入反检测脚本
|
||
if self.stealth_script:
|
||
await page.add_init_script(self.stealth_script)
|
||
|
||
# 监听页面错误
|
||
page.on("pageerror", self._handle_page_error)
|
||
page.on("requestfailed", self._handle_request_failed)
|
||
|
||
logger.debug("新页面创建成功")
|
||
return page
|
||
except Exception as e:
|
||
raise BrowserError(f"页面创建失败: {e}")
|
||
|
||
async def _handle_page_error(self, error):
|
||
"""处理页面错误"""
|
||
logger.warning(f"页面错误: {error}")
|
||
|
||
async def _handle_request_failed(self, request):
|
||
"""处理请求失败"""
|
||
url = request.url
|
||
failure = request.failure
|
||
logger.debug(f"请求失败: {url} - {failure}")
|
||
|
||
async def inject_stealth_script(self, page: Page):
|
||
"""向页面注入反检测脚本"""
|
||
if self.stealth_script:
|
||
try:
|
||
await page.evaluate(self.stealth_script)
|
||
logger.debug("反检测脚本注入成功")
|
||
except Exception as e:
|
||
logger.warning(f"反检测脚本注入失败: {e}")
|
||
|
||
async def take_screenshot(self, page: Page, file_path: str, full_page: bool = True):
|
||
"""截取页面截图"""
|
||
try:
|
||
await page.screenshot(path=file_path, full_page=full_page)
|
||
logger.info(f"页面截图保存成功: {file_path}")
|
||
except Exception as e:
|
||
logger.error(f"页面截图失败: {e}")
|
||
|
||
async def wait_for_network_idle(self, page: Page, timeout: float = 5000):
|
||
"""等待网络空闲"""
|
||
try:
|
||
await page.wait_for_load_state("networkidle", timeout=timeout)
|
||
except Exception as e:
|
||
raise TimeoutError(f"等待网络空闲超时: {timeout}ms", timeout=timeout)
|
||
|
||
async def close_page(self, page: Page):
|
||
"""关闭页面"""
|
||
try:
|
||
await page.close()
|
||
logger.debug("页面关闭成功")
|
||
except Exception as e:
|
||
logger.warning(f"页面关闭失败: {e}")
|
||
|
||
async def close_context(self, context: BrowserContext):
|
||
"""关闭浏览器上下文"""
|
||
try:
|
||
# 从缓存中移除
|
||
context_id = None
|
||
for cid, ctx in self.contexts.items():
|
||
if ctx == context:
|
||
context_id = cid
|
||
break
|
||
|
||
if context_id:
|
||
del self.contexts[context_id]
|
||
|
||
await context.close()
|
||
logger.debug("浏览器上下文关闭成功")
|
||
except Exception as e:
|
||
logger.warning(f"浏览器上下文关闭失败: {e}")
|
||
|
||
async def close_browser(self, browser: Browser):
|
||
"""关闭浏览器"""
|
||
try:
|
||
# 从缓存中移除
|
||
browser_id = None
|
||
for bid, br in self.browsers.items():
|
||
if br == browser:
|
||
browser_id = bid
|
||
break
|
||
|
||
if browser_id:
|
||
del self.browsers[browser_id]
|
||
|
||
await browser.close()
|
||
logger.debug("浏览器关闭成功")
|
||
except Exception as e:
|
||
logger.warning(f"浏览器关闭失败: {e}")
|
||
|
||
async def cleanup(self):
|
||
"""清理所有资源"""
|
||
async with self._lock:
|
||
# 关闭所有上下文
|
||
for context in list(self.contexts.values()):
|
||
try:
|
||
await context.close()
|
||
except:
|
||
pass
|
||
self.contexts.clear()
|
||
|
||
# 关闭所有浏览器
|
||
for browser in list(self.browsers.values()):
|
||
try:
|
||
await browser.close()
|
||
except:
|
||
pass
|
||
self.browsers.clear()
|
||
|
||
# 关闭Playwright
|
||
if self.playwright:
|
||
try:
|
||
await self.playwright.stop()
|
||
except:
|
||
pass
|
||
self.playwright = None
|
||
|
||
logger.info("浏览器管理器资源清理完成")
|
||
|
||
|
||
# 全局浏览器管理器实例
|
||
browser_manager = BrowserManager() |