405 lines
14 KiB
Python
Raw Permalink Normal View History

2025-11-12 00:28:07 +08:00
"""
人类行为模拟工具
模拟真实用户的行为模式包括打字点击滚动等
"""
import random
import asyncio
from typing import Optional, Tuple, List
from playwright.async_api import Page, Locator, ElementHandle
from ..config.settings import settings
from .logger import get_logger
logger = get_logger(__name__)
class HumanBehaviorSimulator:
"""人类行为模拟器"""
def __init__(self, config=None):
"""
初始化人类行为模拟器
Args:
config: 人类行为配置如果为None则使用默认配置
"""
self.config = config or settings.human_behavior
self.typing_speed = self.config.typing_speed # (min, max) ms/字符
self.pause_probability = self.config.pause_probability
self.random_delay_range = self.config.random_delay_range # (min, max) 秒
self.click_delay_range = self.config.click_delay_range
self.scroll_delay_range = self.config.scroll_delay_range
async def human_type(
self,
page: Page,
selector: str,
text: str,
clear_first: bool = True,
delay: Optional[Tuple[float, float]] = None
):
"""
模拟人类输入文字
Args:
page: Playwright页面对象
selector: 元素选择器
text: 要输入的文本
clear_first: 是否先清空输入框
delay: 自定义打字延迟范围
"""
try:
element = await page.wait_for_selector(selector, timeout=10000)
if not element:
logger.warning(f"未找到输入元素: {selector}")
return False
# 确保元素可见和可编辑
await element.scroll_into_view_if_needed()
await element.wait_for_element_state("visible")
# 聚焦元素
await element.click()
# 清空输入框
if clear_first:
await element.fill("")
# 分批输入文字,模拟真实打字
words = text.split()
for i, word in enumerate(words):
# 输入单词
for char in word:
char_delay = delay or self.typing_speed
delay_ms = random.randint(*char_delay)
await element.type(char, delay=delay_ms)
# 随机暂停
if random.random() < self.pause_probability:
pause_time = random.uniform(*self.random_delay_range)
await asyncio.sleep(pause_time)
# 如果不是最后一个单词,添加空格
if i < len(words) - 1:
await element.type(" ", delay=random.randint(*self.typing_speed))
logger.debug(f"人类化输入完成: {selector} -> {text[:20]}...")
return True
except Exception as e:
logger.error(f"人类化输入失败: {selector} - {e}")
return False
async def human_click(
self,
page: Page,
selector: str,
position: Optional[Tuple[float, float]] = None,
modifiers: Optional[List[str]] = None,
delay: Optional[float] = None
):
"""
模拟人类点击
Args:
page: Playwright页面对象
selector: 元素选择器
position: 点击位置 (x, y)
modifiers: 修饰键列表 ['Ctrl', 'Shift', 'Alt', 'Meta']
delay: 点击前延迟时间
"""
try:
element = await page.wait_for_selector(selector, timeout=10000)
if not element:
logger.warning(f"未找到点击元素: {selector}")
return False
# 等待元素可见和可点击
await element.scroll_into_view_if_needed()
await element.wait_for_element_state("visible")
# 添加随机延迟
click_delay = delay or random.uniform(*self.click_delay_range)
await asyncio.sleep(click_delay)
# 模拟鼠标移动到元素位置
bbox = await element.bounding_box()
if bbox:
# 在元素内部随机选择点击位置
if not position:
x = bbox['x'] + bbox['width'] * random.uniform(0.2, 0.8)
y = bbox['y'] + bbox['height'] * random.uniform(0.2, 0.8)
position = (x, y)
# 移动鼠标到点击位置
await page.mouse.move(position[0], position[1])
# 小幅随机移动,模拟真实手部抖动
offset_x = random.uniform(-2, 2)
offset_y = random.uniform(-2, 2)
await page.mouse.move(position[0] + offset_x, position[1] + offset_y)
# 点击
await page.mouse.click(position[0] + offset_x, position[1] + offset_y, modifiers=modifiers or [])
else:
# 如果无法获取边界框,直接点击元素
await element.click(modifiers=modifiers or [])
logger.debug(f"人类化点击完成: {selector}")
return True
except Exception as e:
logger.error(f"人类化点击失败: {selector} - {e}")
return False
async def human_scroll(
self,
page: Page,
distance: int = 300,
direction: str = "down",
steps: int = 3
):
"""
模拟人类滚动
Args:
page: Playwright页面对象
distance: 滚动距离
direction: 滚动方向 ('up', 'down', 'left', 'right')
steps: 滚动步数
"""
try:
if direction not in ['up', 'down', 'left', 'right']:
direction = 'down'
# 计算每步滚动距离
step_distance = distance / steps
for i in range(steps):
if direction == 'down':
await page.mouse.wheel(0, step_distance)
elif direction == 'up':
await page.mouse.wheel(0, -step_distance)
elif direction == 'right':
await page.mouse.wheel(step_distance, 0)
elif direction == 'left':
await page.mouse.wheel(-step_distance, 0)
# 每步之间添加随机延迟
scroll_delay = random.uniform(*self.scroll_delay_range)
await asyncio.sleep(scroll_delay)
logger.debug(f"人类化滚动完成: {direction} {distance}px")
return True
except Exception as e:
logger.error(f"人类化滚动失败: {e}")
return False
async def human_select_option(
self,
page: Page,
selector: str,
value: str,
delay: Optional[float] = None
):
"""
模拟人类选择下拉选项
Args:
page: Playwright页面对象
selector: 下拉框选择器
value: 选项值
delay: 操作前延迟
"""
try:
element = await page.wait_for_selector(selector, timeout=10000)
if not element:
logger.warning(f"未找到下拉框: {selector}")
return False
# 滚动到元素
await element.scroll_into_view_if_needed()
# 添加随机延迟
if delay:
await asyncio.sleep(delay)
else:
await asyncio.sleep(random.uniform(*self.random_delay_range))
# 点击下拉框
await element.click()
# 等待选项出现并选择
option_selector = f"{selector} option[value='{value}']"
await page.wait_for_selector(option_selector, timeout=5000)
await page.select_option(selector, value)
logger.debug(f"人类化选择完成: {selector} -> {value}")
return True
except Exception as e:
logger.error(f"人类化选择失败: {selector} - {e}")
return False
async def human_drag_and_drop(
self,
page: Page,
source_selector: str,
target_selector: str,
delay: Optional[float] = None
):
"""
模拟人类拖拽操作
Args:
page: Playwright页面对象
source_selector: 源元素选择器
target_selector: 目标元素选择器
delay: 操作前延迟
"""
try:
source_element = await page.wait_for_selector(source_selector, timeout=10000)
target_element = await page.wait_for_selector(target_selector, timeout=10000)
if not source_element or not target_element:
logger.warning(f"未找到拖拽元素: {source_selector} -> {target_selector}")
return False
# 获取元素位置
source_bbox = await source_element.bounding_box()
target_bbox = await target_element.bounding_box()
if not source_bbox or not target_bbox:
logger.error("无法获取元素位置信息")
return False
# 添加随机延迟
if delay:
await asyncio.sleep(delay)
else:
await asyncio.sleep(random.uniform(*self.random_delay_range))
# 开始拖拽
source_x = source_bbox['x'] + source_bbox['width'] / 2
source_y = source_bbox['y'] + source_bbox['height'] / 2
target_x = target_bbox['x'] + target_bbox['width'] / 2
target_y = target_bbox['y'] + target_bbox['height'] / 2
await page.mouse.move(source_x, source_y)
await page.mouse.down()
# 模拟拖拽路径(不是直线)
steps = 10
for i in range(1, steps + 1):
progress = i / steps
# 添加一些曲线变化
curve = math.sin(progress * math.pi) * 20
x = source_x + (target_x - source_x) * progress + random.uniform(-curve, curve)
y = source_y + (target_y - source_y) * progress + random.uniform(-curve, curve)
await page.mouse.move(x, y)
await asyncio.sleep(0.05)
await page.mouse.up()
logger.debug(f"人类化拖拽完成: {source_selector} -> {target_selector}")
return True
except Exception as e:
logger.error(f"人类化拖拽失败: {e}")
return False
async def random_wait(self, min_seconds: float = 1.0, max_seconds: float = 3.0):
"""
随机等待
Args:
min_seconds: 最小等待时间
max_seconds: 最大等待时间
"""
wait_time = random.uniform(min_seconds, max_seconds)
await asyncio.sleep(wait_time)
logger.debug(f"随机等待: {wait_time:.2f}")
async def simulate_reading(self, page: Page, reading_time: float = 5.0):
"""
模拟阅读行为
Args:
page: Playwright页面对象
reading_time: 阅读时间
"""
try:
# 在阅读期间进行轻微的滚动和鼠标移动
start_time = asyncio.get_event_loop().time()
while asyncio.get_event_loop().time() - start_time < reading_time:
# 轻微滚动
scroll_distance = random.randint(-50, 50)
await page.mouse.wheel(0, scroll_distance)
# 鼠标小幅移动
viewport = page.viewport_size
if viewport:
x = random.randint(100, viewport['width'] - 100)
y = random.randint(100, viewport['height'] - 100)
await page.mouse.move(x, y)
# 等待随机时间
await asyncio.sleep(random.uniform(0.5, 1.5))
logger.debug(f"模拟阅读完成: {reading_time}")
return True
except Exception as e:
logger.error(f"模拟阅读失败: {e}")
return False
async def random_mouse_movement(self, page: Page, duration: float = 2.0):
"""
随机鼠标移动
Args:
page: Playwright页面对象
duration: 移动持续时间
"""
try:
viewport = page.viewport_size
if not viewport:
return False
start_time = asyncio.get_event_loop().time()
current_x = viewport['width'] / 2
current_y = viewport['height'] / 2
while asyncio.get_event_loop().time() - start_time < duration:
# 生成随机目标位置
target_x = random.randint(50, viewport['width'] - 50)
target_y = random.randint(50, viewport['height'] - 50)
# 平滑移动到目标位置
steps = 20
for i in range(steps + 1):
progress = i / steps
x = current_x + (target_x - current_x) * progress
y = current_y + (target_y - current_y) * progress
await page.mouse.move(x, y)
await asyncio.sleep(0.03)
current_x, current_y = target_x, target_y
await asyncio.sleep(random.uniform(0.1, 0.3))
logger.debug(f"随机鼠标移动完成: {duration}")
return True
except Exception as e:
logger.error(f"随机鼠标移动失败: {e}")
return False
# 创建全局实例
human_behavior = HumanBehaviorSimulator()
# 导入math模块用于拖拽中的曲线计算
import math