337 lines
12 KiB
Python
337 lines
12 KiB
Python
|
|
"""
|
|||
|
|
浏览器管理工具
|
|||
|
|
基于Playwright的浏览器实例管理和反检测脚本注入。
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import asyncio
|
|||
|
|
import os
|
|||
|
|
from pathlib import Path
|
|||
|
|
from typing import Optional, Dict, Any, List
|
|||
|
|
from playwright.async_api import Browser, BrowserContext, Page, Playwright, async_playwright
|
|||
|
|
|
|||
|
|
from ..config.settings import settings
|
|||
|
|
from .exceptions import BrowserError, TimeoutError
|
|||
|
|
from .logger import get_logger
|
|||
|
|
|
|||
|
|
logger = get_logger(__name__)
|
|||
|
|
|
|||
|
|
|
|||
|
|
class BrowserManager:
|
|||
|
|
"""浏览器实例管理器"""
|
|||
|
|
|
|||
|
|
def __init__(self):
|
|||
|
|
self.playwright: Optional[Playwright] = None
|
|||
|
|
self.browsers: Dict[str, Browser] = {}
|
|||
|
|
self.contexts: Dict[str, BrowserContext] = {}
|
|||
|
|
self.stealth_script: Optional[str] = None
|
|||
|
|
self._lock = asyncio.Lock()
|
|||
|
|
|
|||
|
|
async def initialize(self):
|
|||
|
|
"""初始化Playwright"""
|
|||
|
|
async with self._lock:
|
|||
|
|
if self.playwright is None:
|
|||
|
|
self.playwright = await async_playwright().start()
|
|||
|
|
await self._load_stealth_script()
|
|||
|
|
logger.info("浏览器管理器初始化完成")
|
|||
|
|
|
|||
|
|
async def _load_stealth_script(self):
|
|||
|
|
"""加载反检测脚本"""
|
|||
|
|
try:
|
|||
|
|
script_path = Path(__file__).parent.parent / "assets" / "stealth.min.js"
|
|||
|
|
if script_path.exists():
|
|||
|
|
with open(script_path, 'r', encoding='utf-8') as f:
|
|||
|
|
self.stealth_script = f.read()
|
|||
|
|
logger.debug("反检测脚本加载成功")
|
|||
|
|
else:
|
|||
|
|
# 如果没有本地脚本,使用内置的基本脚本
|
|||
|
|
self.stealth_script = self._get_basic_stealth_script()
|
|||
|
|
logger.debug("使用内置反检测脚本")
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.warning(f"反检测脚本加载失败: {e}")
|
|||
|
|
self.stealth_script = self._get_basic_stealth_script()
|
|||
|
|
|
|||
|
|
def _get_basic_stealth_script(self) -> str:
|
|||
|
|
"""获取基本的反检测脚本"""
|
|||
|
|
return """
|
|||
|
|
// 基本反检测脚本
|
|||
|
|
(() => {
|
|||
|
|
// 移除webdriver属性
|
|||
|
|
Object.defineProperty(navigator, 'webdriver', {
|
|||
|
|
get: () => undefined,
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
// 修改plugins
|
|||
|
|
Object.defineProperty(navigator, 'plugins', {
|
|||
|
|
get: () => [1, 2, 3, 4, 5],
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
// 修改languages
|
|||
|
|
Object.defineProperty(navigator, 'languages', {
|
|||
|
|
get: () => ['zh-CN', 'zh', 'en'],
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
// 添加chrome属性
|
|||
|
|
window.chrome = {
|
|||
|
|
runtime: {},
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
// 重写permission query
|
|||
|
|
const originalQuery = window.navigator.permissions.query;
|
|||
|
|
window.navigator.permissions.query = (parameters) => (
|
|||
|
|
parameters.name === 'notifications' ?
|
|||
|
|
Promise.resolve({ state: Notification.permission }) :
|
|||
|
|
originalQuery(parameters)
|
|||
|
|
);
|
|||
|
|
})();
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
async def get_browser(
|
|||
|
|
self,
|
|||
|
|
headless: Optional[bool] = None,
|
|||
|
|
user_data_dir: Optional[str] = None,
|
|||
|
|
proxy: Optional[Dict[str, Any]] = None
|
|||
|
|
) -> Browser:
|
|||
|
|
"""获取或创建浏览器实例"""
|
|||
|
|
await self.initialize()
|
|||
|
|
|
|||
|
|
headless = headless if headless is not None else settings.browser.headless
|
|||
|
|
proxy = proxy or ({"server": settings.browser.proxy} if settings.browser.proxy else None)
|
|||
|
|
|
|||
|
|
# 生成浏览器标识
|
|||
|
|
browser_id = f"headless_{headless}_{user_data_dir or 'default'}"
|
|||
|
|
|
|||
|
|
async with self._lock:
|
|||
|
|
if browser_id not in self.browsers or self.browsers[browser_id].is_connected() is False:
|
|||
|
|
browser_args = self._get_browser_args()
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
browser = await self.playwright.chromium.launch(
|
|||
|
|
headless=headless,
|
|||
|
|
args=browser_args,
|
|||
|
|
proxy=proxy,
|
|||
|
|
executable_path=settings.browser.executable_path
|
|||
|
|
)
|
|||
|
|
self.browsers[browser_id] = browser
|
|||
|
|
logger.info(f"浏览器实例创建成功: {browser_id}")
|
|||
|
|
except Exception as e:
|
|||
|
|
raise BrowserError(f"浏览器启动失败: {e}")
|
|||
|
|
|
|||
|
|
return self.browsers[browser_id]
|
|||
|
|
|
|||
|
|
def _get_browser_args(self) -> List[str]:
|
|||
|
|
"""获取浏览器启动参数"""
|
|||
|
|
args = [
|
|||
|
|
'--no-sandbox',
|
|||
|
|
'--disable-setuid-sandbox',
|
|||
|
|
'--disable-infobars',
|
|||
|
|
'--window-position=0,0',
|
|||
|
|
'--ignore-certifcate-errors',
|
|||
|
|
'--ignore-certifcate-errors-spki-list',
|
|||
|
|
'--disable-blink-features=AutomationControlled',
|
|||
|
|
'--disable-features=VizDisplayCompositor',
|
|||
|
|
'--disable-extensions',
|
|||
|
|
'--disable-plugins',
|
|||
|
|
'--disable-images', # 可以根据需要开启
|
|||
|
|
'--disable-javascript', # 不能关闭,需要JS执行
|
|||
|
|
'--disable-default-apps',
|
|||
|
|
'--disable-background-timer-throttling',
|
|||
|
|
'--disable-backgrounding-occluded-windows',
|
|||
|
|
'--disable-renderer-backgrounding',
|
|||
|
|
'--disable-background-networking',
|
|||
|
|
'--disable-features=TranslateUI',
|
|||
|
|
'--disable-ipc-flooding-protection',
|
|||
|
|
'--enable-automation',
|
|||
|
|
'--password-store=basic',
|
|||
|
|
'--use-mock-keychain',
|
|||
|
|
'--no-first-run',
|
|||
|
|
'--no-default-browser-check',
|
|||
|
|
'--disable-fre',
|
|||
|
|
'--disable-features=VizDisplayCompositor'
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
if not settings.browser.headless:
|
|||
|
|
args.extend([
|
|||
|
|
'--start-maximized',
|
|||
|
|
'--disable-web-security',
|
|||
|
|
'--disable-features=VizDisplayCompositor'
|
|||
|
|
])
|
|||
|
|
|
|||
|
|
return args
|
|||
|
|
|
|||
|
|
async def get_context(
|
|||
|
|
self,
|
|||
|
|
browser: Optional[Browser] = None,
|
|||
|
|
user_data_dir: Optional[str] = None,
|
|||
|
|
viewport: Optional[Dict[str, int]] = None,
|
|||
|
|
user_agent: Optional[str] = None
|
|||
|
|
) -> BrowserContext:
|
|||
|
|
"""获取或创建浏览器上下文"""
|
|||
|
|
if browser is None:
|
|||
|
|
browser = await self.get_browser()
|
|||
|
|
|
|||
|
|
context_id = f"{browser}_{user_data_dir or 'default'}"
|
|||
|
|
|
|||
|
|
async with self._lock:
|
|||
|
|
if context_id not in self.contexts:
|
|||
|
|
context_options = {
|
|||
|
|
'viewport': viewport or {
|
|||
|
|
'width': settings.browser.viewport_width,
|
|||
|
|
'height': settings.browser.viewport_height
|
|||
|
|
},
|
|||
|
|
'locale': settings.browser.locale,
|
|||
|
|
'timezone_id': settings.browser.timezone,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if user_data_dir:
|
|||
|
|
context_options['user_data_dir'] = user_data_dir
|
|||
|
|
|
|||
|
|
if user_agent or settings.browser.user_agent:
|
|||
|
|
context_options['user_agent'] = user_agent or settings.browser.user_agent
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
context = await browser.new_context(**context_options)
|
|||
|
|
self.contexts[context_id] = context
|
|||
|
|
logger.debug(f"浏览器上下文创建成功: {context_id}")
|
|||
|
|
except Exception as e:
|
|||
|
|
raise BrowserError(f"浏览器上下文创建失败: {e}")
|
|||
|
|
|
|||
|
|
return self.contexts[context_id]
|
|||
|
|
|
|||
|
|
async def get_page(
|
|||
|
|
self,
|
|||
|
|
context: Optional[BrowserContext] = None,
|
|||
|
|
browser: Optional[Browser] = None
|
|||
|
|
) -> Page:
|
|||
|
|
"""创建新页面"""
|
|||
|
|
if context is None:
|
|||
|
|
context = await self.get_context(browser)
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
page = await context.new_page()
|
|||
|
|
|
|||
|
|
# 设置默认超时
|
|||
|
|
page.set_default_timeout(settings.browser.timeout)
|
|||
|
|
|
|||
|
|
# 注入反检测脚本
|
|||
|
|
if self.stealth_script:
|
|||
|
|
await page.add_init_script(self.stealth_script)
|
|||
|
|
|
|||
|
|
# 监听页面错误
|
|||
|
|
page.on("pageerror", self._handle_page_error)
|
|||
|
|
page.on("requestfailed", self._handle_request_failed)
|
|||
|
|
|
|||
|
|
logger.debug("新页面创建成功")
|
|||
|
|
return page
|
|||
|
|
except Exception as e:
|
|||
|
|
raise BrowserError(f"页面创建失败: {e}")
|
|||
|
|
|
|||
|
|
async def _handle_page_error(self, error):
|
|||
|
|
"""处理页面错误"""
|
|||
|
|
logger.warning(f"页面错误: {error}")
|
|||
|
|
|
|||
|
|
async def _handle_request_failed(self, request):
|
|||
|
|
"""处理请求失败"""
|
|||
|
|
url = request.url
|
|||
|
|
failure = request.failure
|
|||
|
|
logger.debug(f"请求失败: {url} - {failure}")
|
|||
|
|
|
|||
|
|
async def inject_stealth_script(self, page: Page):
|
|||
|
|
"""向页面注入反检测脚本"""
|
|||
|
|
if self.stealth_script:
|
|||
|
|
try:
|
|||
|
|
await page.evaluate(self.stealth_script)
|
|||
|
|
logger.debug("反检测脚本注入成功")
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.warning(f"反检测脚本注入失败: {e}")
|
|||
|
|
|
|||
|
|
async def take_screenshot(self, page: Page, file_path: str, full_page: bool = True):
|
|||
|
|
"""截取页面截图"""
|
|||
|
|
try:
|
|||
|
|
await page.screenshot(path=file_path, full_page=full_page)
|
|||
|
|
logger.info(f"页面截图保存成功: {file_path}")
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"页面截图失败: {e}")
|
|||
|
|
|
|||
|
|
async def wait_for_network_idle(self, page: Page, timeout: float = 5000):
|
|||
|
|
"""等待网络空闲"""
|
|||
|
|
try:
|
|||
|
|
await page.wait_for_load_state("networkidle", timeout=timeout)
|
|||
|
|
except Exception as e:
|
|||
|
|
raise TimeoutError(f"等待网络空闲超时: {timeout}ms", timeout=timeout)
|
|||
|
|
|
|||
|
|
async def close_page(self, page: Page):
|
|||
|
|
"""关闭页面"""
|
|||
|
|
try:
|
|||
|
|
await page.close()
|
|||
|
|
logger.debug("页面关闭成功")
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.warning(f"页面关闭失败: {e}")
|
|||
|
|
|
|||
|
|
async def close_context(self, context: BrowserContext):
|
|||
|
|
"""关闭浏览器上下文"""
|
|||
|
|
try:
|
|||
|
|
# 从缓存中移除
|
|||
|
|
context_id = None
|
|||
|
|
for cid, ctx in self.contexts.items():
|
|||
|
|
if ctx == context:
|
|||
|
|
context_id = cid
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
if context_id:
|
|||
|
|
del self.contexts[context_id]
|
|||
|
|
|
|||
|
|
await context.close()
|
|||
|
|
logger.debug("浏览器上下文关闭成功")
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.warning(f"浏览器上下文关闭失败: {e}")
|
|||
|
|
|
|||
|
|
async def close_browser(self, browser: Browser):
|
|||
|
|
"""关闭浏览器"""
|
|||
|
|
try:
|
|||
|
|
# 从缓存中移除
|
|||
|
|
browser_id = None
|
|||
|
|
for bid, br in self.browsers.items():
|
|||
|
|
if br == browser:
|
|||
|
|
browser_id = bid
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
if browser_id:
|
|||
|
|
del self.browsers[browser_id]
|
|||
|
|
|
|||
|
|
await browser.close()
|
|||
|
|
logger.debug("浏览器关闭成功")
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.warning(f"浏览器关闭失败: {e}")
|
|||
|
|
|
|||
|
|
async def cleanup(self):
|
|||
|
|
"""清理所有资源"""
|
|||
|
|
async with self._lock:
|
|||
|
|
# 关闭所有上下文
|
|||
|
|
for context in list(self.contexts.values()):
|
|||
|
|
try:
|
|||
|
|
await context.close()
|
|||
|
|
except:
|
|||
|
|
pass
|
|||
|
|
self.contexts.clear()
|
|||
|
|
|
|||
|
|
# 关闭所有浏览器
|
|||
|
|
for browser in list(self.browsers.values()):
|
|||
|
|
try:
|
|||
|
|
await browser.close()
|
|||
|
|
except:
|
|||
|
|
pass
|
|||
|
|
self.browsers.clear()
|
|||
|
|
|
|||
|
|
# 关闭Playwright
|
|||
|
|
if self.playwright:
|
|||
|
|
try:
|
|||
|
|
await self.playwright.stop()
|
|||
|
|
except:
|
|||
|
|
pass
|
|||
|
|
self.playwright = None
|
|||
|
|
|
|||
|
|
logger.info("浏览器管理器资源清理完成")
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 全局浏览器管理器实例
|
|||
|
|
browser_manager = BrowserManager()
|