405 lines
14 KiB
Python
405 lines
14 KiB
Python
"""
|
||
人类行为模拟工具
|
||
模拟真实用户的行为模式,包括打字、点击、滚动等。
|
||
"""
|
||
|
||
import random
|
||
import asyncio
|
||
from typing import Optional, Tuple, List
|
||
from playwright.async_api import Page, Locator, ElementHandle
|
||
|
||
from ..config.settings import settings
|
||
from .logger import get_logger
|
||
|
||
logger = get_logger(__name__)
|
||
|
||
|
||
class HumanBehaviorSimulator:
|
||
"""人类行为模拟器"""
|
||
|
||
def __init__(self, config=None):
|
||
"""
|
||
初始化人类行为模拟器
|
||
|
||
Args:
|
||
config: 人类行为配置,如果为None则使用默认配置
|
||
"""
|
||
self.config = config or settings.human_behavior
|
||
self.typing_speed = self.config.typing_speed # (min, max) ms/字符
|
||
self.pause_probability = self.config.pause_probability
|
||
self.random_delay_range = self.config.random_delay_range # (min, max) 秒
|
||
self.click_delay_range = self.config.click_delay_range
|
||
self.scroll_delay_range = self.config.scroll_delay_range
|
||
|
||
async def human_type(
|
||
self,
|
||
page: Page,
|
||
selector: str,
|
||
text: str,
|
||
clear_first: bool = True,
|
||
delay: Optional[Tuple[float, float]] = None
|
||
):
|
||
"""
|
||
模拟人类输入文字
|
||
|
||
Args:
|
||
page: Playwright页面对象
|
||
selector: 元素选择器
|
||
text: 要输入的文本
|
||
clear_first: 是否先清空输入框
|
||
delay: 自定义打字延迟范围
|
||
"""
|
||
try:
|
||
element = await page.wait_for_selector(selector, timeout=10000)
|
||
if not element:
|
||
logger.warning(f"未找到输入元素: {selector}")
|
||
return False
|
||
|
||
# 确保元素可见和可编辑
|
||
await element.scroll_into_view_if_needed()
|
||
await element.wait_for_element_state("visible")
|
||
|
||
# 聚焦元素
|
||
await element.click()
|
||
|
||
# 清空输入框
|
||
if clear_first:
|
||
await element.fill("")
|
||
|
||
# 分批输入文字,模拟真实打字
|
||
words = text.split()
|
||
for i, word in enumerate(words):
|
||
# 输入单词
|
||
for char in word:
|
||
char_delay = delay or self.typing_speed
|
||
delay_ms = random.randint(*char_delay)
|
||
await element.type(char, delay=delay_ms)
|
||
|
||
# 随机暂停
|
||
if random.random() < self.pause_probability:
|
||
pause_time = random.uniform(*self.random_delay_range)
|
||
await asyncio.sleep(pause_time)
|
||
|
||
# 如果不是最后一个单词,添加空格
|
||
if i < len(words) - 1:
|
||
await element.type(" ", delay=random.randint(*self.typing_speed))
|
||
|
||
logger.debug(f"人类化输入完成: {selector} -> {text[:20]}...")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"人类化输入失败: {selector} - {e}")
|
||
return False
|
||
|
||
async def human_click(
|
||
self,
|
||
page: Page,
|
||
selector: str,
|
||
position: Optional[Tuple[float, float]] = None,
|
||
modifiers: Optional[List[str]] = None,
|
||
delay: Optional[float] = None
|
||
):
|
||
"""
|
||
模拟人类点击
|
||
|
||
Args:
|
||
page: Playwright页面对象
|
||
selector: 元素选择器
|
||
position: 点击位置 (x, y)
|
||
modifiers: 修饰键列表 ['Ctrl', 'Shift', 'Alt', 'Meta']
|
||
delay: 点击前延迟时间
|
||
"""
|
||
try:
|
||
element = await page.wait_for_selector(selector, timeout=10000)
|
||
if not element:
|
||
logger.warning(f"未找到点击元素: {selector}")
|
||
return False
|
||
|
||
# 等待元素可见和可点击
|
||
await element.scroll_into_view_if_needed()
|
||
await element.wait_for_element_state("visible")
|
||
|
||
# 添加随机延迟
|
||
click_delay = delay or random.uniform(*self.click_delay_range)
|
||
await asyncio.sleep(click_delay)
|
||
|
||
# 模拟鼠标移动到元素位置
|
||
bbox = await element.bounding_box()
|
||
if bbox:
|
||
# 在元素内部随机选择点击位置
|
||
if not position:
|
||
x = bbox['x'] + bbox['width'] * random.uniform(0.2, 0.8)
|
||
y = bbox['y'] + bbox['height'] * random.uniform(0.2, 0.8)
|
||
position = (x, y)
|
||
|
||
# 移动鼠标到点击位置
|
||
await page.mouse.move(position[0], position[1])
|
||
|
||
# 小幅随机移动,模拟真实手部抖动
|
||
offset_x = random.uniform(-2, 2)
|
||
offset_y = random.uniform(-2, 2)
|
||
await page.mouse.move(position[0] + offset_x, position[1] + offset_y)
|
||
|
||
# 点击
|
||
await page.mouse.click(position[0] + offset_x, position[1] + offset_y, modifiers=modifiers or [])
|
||
else:
|
||
# 如果无法获取边界框,直接点击元素
|
||
await element.click(modifiers=modifiers or [])
|
||
|
||
logger.debug(f"人类化点击完成: {selector}")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"人类化点击失败: {selector} - {e}")
|
||
return False
|
||
|
||
async def human_scroll(
|
||
self,
|
||
page: Page,
|
||
distance: int = 300,
|
||
direction: str = "down",
|
||
steps: int = 3
|
||
):
|
||
"""
|
||
模拟人类滚动
|
||
|
||
Args:
|
||
page: Playwright页面对象
|
||
distance: 滚动距离
|
||
direction: 滚动方向 ('up', 'down', 'left', 'right')
|
||
steps: 滚动步数
|
||
"""
|
||
try:
|
||
if direction not in ['up', 'down', 'left', 'right']:
|
||
direction = 'down'
|
||
|
||
# 计算每步滚动距离
|
||
step_distance = distance / steps
|
||
|
||
for i in range(steps):
|
||
if direction == 'down':
|
||
await page.mouse.wheel(0, step_distance)
|
||
elif direction == 'up':
|
||
await page.mouse.wheel(0, -step_distance)
|
||
elif direction == 'right':
|
||
await page.mouse.wheel(step_distance, 0)
|
||
elif direction == 'left':
|
||
await page.mouse.wheel(-step_distance, 0)
|
||
|
||
# 每步之间添加随机延迟
|
||
scroll_delay = random.uniform(*self.scroll_delay_range)
|
||
await asyncio.sleep(scroll_delay)
|
||
|
||
logger.debug(f"人类化滚动完成: {direction} {distance}px")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"人类化滚动失败: {e}")
|
||
return False
|
||
|
||
async def human_select_option(
|
||
self,
|
||
page: Page,
|
||
selector: str,
|
||
value: str,
|
||
delay: Optional[float] = None
|
||
):
|
||
"""
|
||
模拟人类选择下拉选项
|
||
|
||
Args:
|
||
page: Playwright页面对象
|
||
selector: 下拉框选择器
|
||
value: 选项值
|
||
delay: 操作前延迟
|
||
"""
|
||
try:
|
||
element = await page.wait_for_selector(selector, timeout=10000)
|
||
if not element:
|
||
logger.warning(f"未找到下拉框: {selector}")
|
||
return False
|
||
|
||
# 滚动到元素
|
||
await element.scroll_into_view_if_needed()
|
||
|
||
# 添加随机延迟
|
||
if delay:
|
||
await asyncio.sleep(delay)
|
||
else:
|
||
await asyncio.sleep(random.uniform(*self.random_delay_range))
|
||
|
||
# 点击下拉框
|
||
await element.click()
|
||
|
||
# 等待选项出现并选择
|
||
option_selector = f"{selector} option[value='{value}']"
|
||
await page.wait_for_selector(option_selector, timeout=5000)
|
||
await page.select_option(selector, value)
|
||
|
||
logger.debug(f"人类化选择完成: {selector} -> {value}")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"人类化选择失败: {selector} - {e}")
|
||
return False
|
||
|
||
async def human_drag_and_drop(
|
||
self,
|
||
page: Page,
|
||
source_selector: str,
|
||
target_selector: str,
|
||
delay: Optional[float] = None
|
||
):
|
||
"""
|
||
模拟人类拖拽操作
|
||
|
||
Args:
|
||
page: Playwright页面对象
|
||
source_selector: 源元素选择器
|
||
target_selector: 目标元素选择器
|
||
delay: 操作前延迟
|
||
"""
|
||
try:
|
||
source_element = await page.wait_for_selector(source_selector, timeout=10000)
|
||
target_element = await page.wait_for_selector(target_selector, timeout=10000)
|
||
|
||
if not source_element or not target_element:
|
||
logger.warning(f"未找到拖拽元素: {source_selector} -> {target_selector}")
|
||
return False
|
||
|
||
# 获取元素位置
|
||
source_bbox = await source_element.bounding_box()
|
||
target_bbox = await target_element.bounding_box()
|
||
|
||
if not source_bbox or not target_bbox:
|
||
logger.error("无法获取元素位置信息")
|
||
return False
|
||
|
||
# 添加随机延迟
|
||
if delay:
|
||
await asyncio.sleep(delay)
|
||
else:
|
||
await asyncio.sleep(random.uniform(*self.random_delay_range))
|
||
|
||
# 开始拖拽
|
||
source_x = source_bbox['x'] + source_bbox['width'] / 2
|
||
source_y = source_bbox['y'] + source_bbox['height'] / 2
|
||
target_x = target_bbox['x'] + target_bbox['width'] / 2
|
||
target_y = target_bbox['y'] + target_bbox['height'] / 2
|
||
|
||
await page.mouse.move(source_x, source_y)
|
||
await page.mouse.down()
|
||
|
||
# 模拟拖拽路径(不是直线)
|
||
steps = 10
|
||
for i in range(1, steps + 1):
|
||
progress = i / steps
|
||
# 添加一些曲线变化
|
||
curve = math.sin(progress * math.pi) * 20
|
||
x = source_x + (target_x - source_x) * progress + random.uniform(-curve, curve)
|
||
y = source_y + (target_y - source_y) * progress + random.uniform(-curve, curve)
|
||
await page.mouse.move(x, y)
|
||
await asyncio.sleep(0.05)
|
||
|
||
await page.mouse.up()
|
||
|
||
logger.debug(f"人类化拖拽完成: {source_selector} -> {target_selector}")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"人类化拖拽失败: {e}")
|
||
return False
|
||
|
||
async def random_wait(self, min_seconds: float = 1.0, max_seconds: float = 3.0):
|
||
"""
|
||
随机等待
|
||
|
||
Args:
|
||
min_seconds: 最小等待时间
|
||
max_seconds: 最大等待时间
|
||
"""
|
||
wait_time = random.uniform(min_seconds, max_seconds)
|
||
await asyncio.sleep(wait_time)
|
||
logger.debug(f"随机等待: {wait_time:.2f}秒")
|
||
|
||
async def simulate_reading(self, page: Page, reading_time: float = 5.0):
|
||
"""
|
||
模拟阅读行为
|
||
|
||
Args:
|
||
page: Playwright页面对象
|
||
reading_time: 阅读时间(秒)
|
||
"""
|
||
try:
|
||
# 在阅读期间进行轻微的滚动和鼠标移动
|
||
start_time = asyncio.get_event_loop().time()
|
||
|
||
while asyncio.get_event_loop().time() - start_time < reading_time:
|
||
# 轻微滚动
|
||
scroll_distance = random.randint(-50, 50)
|
||
await page.mouse.wheel(0, scroll_distance)
|
||
|
||
# 鼠标小幅移动
|
||
viewport = page.viewport_size
|
||
if viewport:
|
||
x = random.randint(100, viewport['width'] - 100)
|
||
y = random.randint(100, viewport['height'] - 100)
|
||
await page.mouse.move(x, y)
|
||
|
||
# 等待随机时间
|
||
await asyncio.sleep(random.uniform(0.5, 1.5))
|
||
|
||
logger.debug(f"模拟阅读完成: {reading_time}秒")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"模拟阅读失败: {e}")
|
||
return False
|
||
|
||
async def random_mouse_movement(self, page: Page, duration: float = 2.0):
|
||
"""
|
||
随机鼠标移动
|
||
|
||
Args:
|
||
page: Playwright页面对象
|
||
duration: 移动持续时间(秒)
|
||
"""
|
||
try:
|
||
viewport = page.viewport_size
|
||
if not viewport:
|
||
return False
|
||
|
||
start_time = asyncio.get_event_loop().time()
|
||
current_x = viewport['width'] / 2
|
||
current_y = viewport['height'] / 2
|
||
|
||
while asyncio.get_event_loop().time() - start_time < duration:
|
||
# 生成随机目标位置
|
||
target_x = random.randint(50, viewport['width'] - 50)
|
||
target_y = random.randint(50, viewport['height'] - 50)
|
||
|
||
# 平滑移动到目标位置
|
||
steps = 20
|
||
for i in range(steps + 1):
|
||
progress = i / steps
|
||
x = current_x + (target_x - current_x) * progress
|
||
y = current_y + (target_y - current_y) * progress
|
||
await page.mouse.move(x, y)
|
||
await asyncio.sleep(0.03)
|
||
|
||
current_x, current_y = target_x, target_y
|
||
await asyncio.sleep(random.uniform(0.1, 0.3))
|
||
|
||
logger.debug(f"随机鼠标移动完成: {duration}秒")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"随机鼠标移动失败: {e}")
|
||
return False
|
||
|
||
|
||
# 创建全局实例
|
||
human_behavior = HumanBehaviorSimulator()
|
||
|
||
# 导入math模块用于拖拽中的曲线计算
|
||
import math |