""" 人类行为模拟工具 模拟真实用户的行为模式,包括打字、点击、滚动等。 """ import random import asyncio from typing import Optional, Tuple, List from playwright.async_api import Page, Locator, ElementHandle from ..config.settings import settings from .logger import get_logger logger = get_logger(__name__) class HumanBehaviorSimulator: """人类行为模拟器""" def __init__(self, config=None): """ 初始化人类行为模拟器 Args: config: 人类行为配置,如果为None则使用默认配置 """ self.config = config or settings.human_behavior self.typing_speed = self.config.typing_speed # (min, max) ms/字符 self.pause_probability = self.config.pause_probability self.random_delay_range = self.config.random_delay_range # (min, max) 秒 self.click_delay_range = self.config.click_delay_range self.scroll_delay_range = self.config.scroll_delay_range async def human_type( self, page: Page, selector: str, text: str, clear_first: bool = True, delay: Optional[Tuple[float, float]] = None ): """ 模拟人类输入文字 Args: page: Playwright页面对象 selector: 元素选择器 text: 要输入的文本 clear_first: 是否先清空输入框 delay: 自定义打字延迟范围 """ try: element = await page.wait_for_selector(selector, timeout=10000) if not element: logger.warning(f"未找到输入元素: {selector}") return False # 确保元素可见和可编辑 await element.scroll_into_view_if_needed() await element.wait_for_element_state("visible") # 聚焦元素 await element.click() # 清空输入框 if clear_first: await element.fill("") # 分批输入文字,模拟真实打字 words = text.split() for i, word in enumerate(words): # 输入单词 for char in word: char_delay = delay or self.typing_speed delay_ms = random.randint(*char_delay) await element.type(char, delay=delay_ms) # 随机暂停 if random.random() < self.pause_probability: pause_time = random.uniform(*self.random_delay_range) await asyncio.sleep(pause_time) # 如果不是最后一个单词,添加空格 if i < len(words) - 1: await element.type(" ", delay=random.randint(*self.typing_speed)) logger.debug(f"人类化输入完成: {selector} -> {text[:20]}...") return True except Exception as e: logger.error(f"人类化输入失败: {selector} - {e}") return False async def human_click( self, page: Page, selector: str, position: Optional[Tuple[float, float]] = None, modifiers: Optional[List[str]] = None, delay: Optional[float] = None ): """ 模拟人类点击 Args: page: Playwright页面对象 selector: 元素选择器 position: 点击位置 (x, y) modifiers: 修饰键列表 ['Ctrl', 'Shift', 'Alt', 'Meta'] delay: 点击前延迟时间 """ try: element = await page.wait_for_selector(selector, timeout=10000) if not element: logger.warning(f"未找到点击元素: {selector}") return False # 等待元素可见和可点击 await element.scroll_into_view_if_needed() await element.wait_for_element_state("visible") # 添加随机延迟 click_delay = delay or random.uniform(*self.click_delay_range) await asyncio.sleep(click_delay) # 模拟鼠标移动到元素位置 bbox = await element.bounding_box() if bbox: # 在元素内部随机选择点击位置 if not position: x = bbox['x'] + bbox['width'] * random.uniform(0.2, 0.8) y = bbox['y'] + bbox['height'] * random.uniform(0.2, 0.8) position = (x, y) # 移动鼠标到点击位置 await page.mouse.move(position[0], position[1]) # 小幅随机移动,模拟真实手部抖动 offset_x = random.uniform(-2, 2) offset_y = random.uniform(-2, 2) await page.mouse.move(position[0] + offset_x, position[1] + offset_y) # 点击 await page.mouse.click(position[0] + offset_x, position[1] + offset_y, modifiers=modifiers or []) else: # 如果无法获取边界框,直接点击元素 await element.click(modifiers=modifiers or []) logger.debug(f"人类化点击完成: {selector}") return True except Exception as e: logger.error(f"人类化点击失败: {selector} - {e}") return False async def human_scroll( self, page: Page, distance: int = 300, direction: str = "down", steps: int = 3 ): """ 模拟人类滚动 Args: page: Playwright页面对象 distance: 滚动距离 direction: 滚动方向 ('up', 'down', 'left', 'right') steps: 滚动步数 """ try: if direction not in ['up', 'down', 'left', 'right']: direction = 'down' # 计算每步滚动距离 step_distance = distance / steps for i in range(steps): if direction == 'down': await page.mouse.wheel(0, step_distance) elif direction == 'up': await page.mouse.wheel(0, -step_distance) elif direction == 'right': await page.mouse.wheel(step_distance, 0) elif direction == 'left': await page.mouse.wheel(-step_distance, 0) # 每步之间添加随机延迟 scroll_delay = random.uniform(*self.scroll_delay_range) await asyncio.sleep(scroll_delay) logger.debug(f"人类化滚动完成: {direction} {distance}px") return True except Exception as e: logger.error(f"人类化滚动失败: {e}") return False async def human_select_option( self, page: Page, selector: str, value: str, delay: Optional[float] = None ): """ 模拟人类选择下拉选项 Args: page: Playwright页面对象 selector: 下拉框选择器 value: 选项值 delay: 操作前延迟 """ try: element = await page.wait_for_selector(selector, timeout=10000) if not element: logger.warning(f"未找到下拉框: {selector}") return False # 滚动到元素 await element.scroll_into_view_if_needed() # 添加随机延迟 if delay: await asyncio.sleep(delay) else: await asyncio.sleep(random.uniform(*self.random_delay_range)) # 点击下拉框 await element.click() # 等待选项出现并选择 option_selector = f"{selector} option[value='{value}']" await page.wait_for_selector(option_selector, timeout=5000) await page.select_option(selector, value) logger.debug(f"人类化选择完成: {selector} -> {value}") return True except Exception as e: logger.error(f"人类化选择失败: {selector} - {e}") return False async def human_drag_and_drop( self, page: Page, source_selector: str, target_selector: str, delay: Optional[float] = None ): """ 模拟人类拖拽操作 Args: page: Playwright页面对象 source_selector: 源元素选择器 target_selector: 目标元素选择器 delay: 操作前延迟 """ try: source_element = await page.wait_for_selector(source_selector, timeout=10000) target_element = await page.wait_for_selector(target_selector, timeout=10000) if not source_element or not target_element: logger.warning(f"未找到拖拽元素: {source_selector} -> {target_selector}") return False # 获取元素位置 source_bbox = await source_element.bounding_box() target_bbox = await target_element.bounding_box() if not source_bbox or not target_bbox: logger.error("无法获取元素位置信息") return False # 添加随机延迟 if delay: await asyncio.sleep(delay) else: await asyncio.sleep(random.uniform(*self.random_delay_range)) # 开始拖拽 source_x = source_bbox['x'] + source_bbox['width'] / 2 source_y = source_bbox['y'] + source_bbox['height'] / 2 target_x = target_bbox['x'] + target_bbox['width'] / 2 target_y = target_bbox['y'] + target_bbox['height'] / 2 await page.mouse.move(source_x, source_y) await page.mouse.down() # 模拟拖拽路径(不是直线) steps = 10 for i in range(1, steps + 1): progress = i / steps # 添加一些曲线变化 curve = math.sin(progress * math.pi) * 20 x = source_x + (target_x - source_x) * progress + random.uniform(-curve, curve) y = source_y + (target_y - source_y) * progress + random.uniform(-curve, curve) await page.mouse.move(x, y) await asyncio.sleep(0.05) await page.mouse.up() logger.debug(f"人类化拖拽完成: {source_selector} -> {target_selector}") return True except Exception as e: logger.error(f"人类化拖拽失败: {e}") return False async def random_wait(self, min_seconds: float = 1.0, max_seconds: float = 3.0): """ 随机等待 Args: min_seconds: 最小等待时间 max_seconds: 最大等待时间 """ wait_time = random.uniform(min_seconds, max_seconds) await asyncio.sleep(wait_time) logger.debug(f"随机等待: {wait_time:.2f}秒") async def simulate_reading(self, page: Page, reading_time: float = 5.0): """ 模拟阅读行为 Args: page: Playwright页面对象 reading_time: 阅读时间(秒) """ try: # 在阅读期间进行轻微的滚动和鼠标移动 start_time = asyncio.get_event_loop().time() while asyncio.get_event_loop().time() - start_time < reading_time: # 轻微滚动 scroll_distance = random.randint(-50, 50) await page.mouse.wheel(0, scroll_distance) # 鼠标小幅移动 viewport = page.viewport_size if viewport: x = random.randint(100, viewport['width'] - 100) y = random.randint(100, viewport['height'] - 100) await page.mouse.move(x, y) # 等待随机时间 await asyncio.sleep(random.uniform(0.5, 1.5)) logger.debug(f"模拟阅读完成: {reading_time}秒") return True except Exception as e: logger.error(f"模拟阅读失败: {e}") return False async def random_mouse_movement(self, page: Page, duration: float = 2.0): """ 随机鼠标移动 Args: page: Playwright页面对象 duration: 移动持续时间(秒) """ try: viewport = page.viewport_size if not viewport: return False start_time = asyncio.get_event_loop().time() current_x = viewport['width'] / 2 current_y = viewport['height'] / 2 while asyncio.get_event_loop().time() - start_time < duration: # 生成随机目标位置 target_x = random.randint(50, viewport['width'] - 50) target_y = random.randint(50, viewport['height'] - 50) # 平滑移动到目标位置 steps = 20 for i in range(steps + 1): progress = i / steps x = current_x + (target_x - current_x) * progress y = current_y + (target_y - current_y) * progress await page.mouse.move(x, y) await asyncio.sleep(0.03) current_x, current_y = target_x, target_y await asyncio.sleep(random.uniform(0.1, 0.3)) logger.debug(f"随机鼠标移动完成: {duration}秒") return True except Exception as e: logger.error(f"随机鼠标移动失败: {e}") return False # 创建全局实例 human_behavior = HumanBehaviorSimulator() # 导入math模块用于拖拽中的曲线计算 import math