2025-11-12 00:28:07 +08:00

405 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
人类行为模拟工具
模拟真实用户的行为模式,包括打字、点击、滚动等。
"""
import random
import asyncio
from typing import Optional, Tuple, List
from playwright.async_api import Page, Locator, ElementHandle
from ..config.settings import settings
from .logger import get_logger
logger = get_logger(__name__)
class HumanBehaviorSimulator:
"""人类行为模拟器"""
def __init__(self, config=None):
"""
初始化人类行为模拟器
Args:
config: 人类行为配置如果为None则使用默认配置
"""
self.config = config or settings.human_behavior
self.typing_speed = self.config.typing_speed # (min, max) ms/字符
self.pause_probability = self.config.pause_probability
self.random_delay_range = self.config.random_delay_range # (min, max) 秒
self.click_delay_range = self.config.click_delay_range
self.scroll_delay_range = self.config.scroll_delay_range
async def human_type(
self,
page: Page,
selector: str,
text: str,
clear_first: bool = True,
delay: Optional[Tuple[float, float]] = None
):
"""
模拟人类输入文字
Args:
page: Playwright页面对象
selector: 元素选择器
text: 要输入的文本
clear_first: 是否先清空输入框
delay: 自定义打字延迟范围
"""
try:
element = await page.wait_for_selector(selector, timeout=10000)
if not element:
logger.warning(f"未找到输入元素: {selector}")
return False
# 确保元素可见和可编辑
await element.scroll_into_view_if_needed()
await element.wait_for_element_state("visible")
# 聚焦元素
await element.click()
# 清空输入框
if clear_first:
await element.fill("")
# 分批输入文字,模拟真实打字
words = text.split()
for i, word in enumerate(words):
# 输入单词
for char in word:
char_delay = delay or self.typing_speed
delay_ms = random.randint(*char_delay)
await element.type(char, delay=delay_ms)
# 随机暂停
if random.random() < self.pause_probability:
pause_time = random.uniform(*self.random_delay_range)
await asyncio.sleep(pause_time)
# 如果不是最后一个单词,添加空格
if i < len(words) - 1:
await element.type(" ", delay=random.randint(*self.typing_speed))
logger.debug(f"人类化输入完成: {selector} -> {text[:20]}...")
return True
except Exception as e:
logger.error(f"人类化输入失败: {selector} - {e}")
return False
async def human_click(
self,
page: Page,
selector: str,
position: Optional[Tuple[float, float]] = None,
modifiers: Optional[List[str]] = None,
delay: Optional[float] = None
):
"""
模拟人类点击
Args:
page: Playwright页面对象
selector: 元素选择器
position: 点击位置 (x, y)
modifiers: 修饰键列表 ['Ctrl', 'Shift', 'Alt', 'Meta']
delay: 点击前延迟时间
"""
try:
element = await page.wait_for_selector(selector, timeout=10000)
if not element:
logger.warning(f"未找到点击元素: {selector}")
return False
# 等待元素可见和可点击
await element.scroll_into_view_if_needed()
await element.wait_for_element_state("visible")
# 添加随机延迟
click_delay = delay or random.uniform(*self.click_delay_range)
await asyncio.sleep(click_delay)
# 模拟鼠标移动到元素位置
bbox = await element.bounding_box()
if bbox:
# 在元素内部随机选择点击位置
if not position:
x = bbox['x'] + bbox['width'] * random.uniform(0.2, 0.8)
y = bbox['y'] + bbox['height'] * random.uniform(0.2, 0.8)
position = (x, y)
# 移动鼠标到点击位置
await page.mouse.move(position[0], position[1])
# 小幅随机移动,模拟真实手部抖动
offset_x = random.uniform(-2, 2)
offset_y = random.uniform(-2, 2)
await page.mouse.move(position[0] + offset_x, position[1] + offset_y)
# 点击
await page.mouse.click(position[0] + offset_x, position[1] + offset_y, modifiers=modifiers or [])
else:
# 如果无法获取边界框,直接点击元素
await element.click(modifiers=modifiers or [])
logger.debug(f"人类化点击完成: {selector}")
return True
except Exception as e:
logger.error(f"人类化点击失败: {selector} - {e}")
return False
async def human_scroll(
self,
page: Page,
distance: int = 300,
direction: str = "down",
steps: int = 3
):
"""
模拟人类滚动
Args:
page: Playwright页面对象
distance: 滚动距离
direction: 滚动方向 ('up', 'down', 'left', 'right')
steps: 滚动步数
"""
try:
if direction not in ['up', 'down', 'left', 'right']:
direction = 'down'
# 计算每步滚动距离
step_distance = distance / steps
for i in range(steps):
if direction == 'down':
await page.mouse.wheel(0, step_distance)
elif direction == 'up':
await page.mouse.wheel(0, -step_distance)
elif direction == 'right':
await page.mouse.wheel(step_distance, 0)
elif direction == 'left':
await page.mouse.wheel(-step_distance, 0)
# 每步之间添加随机延迟
scroll_delay = random.uniform(*self.scroll_delay_range)
await asyncio.sleep(scroll_delay)
logger.debug(f"人类化滚动完成: {direction} {distance}px")
return True
except Exception as e:
logger.error(f"人类化滚动失败: {e}")
return False
async def human_select_option(
self,
page: Page,
selector: str,
value: str,
delay: Optional[float] = None
):
"""
模拟人类选择下拉选项
Args:
page: Playwright页面对象
selector: 下拉框选择器
value: 选项值
delay: 操作前延迟
"""
try:
element = await page.wait_for_selector(selector, timeout=10000)
if not element:
logger.warning(f"未找到下拉框: {selector}")
return False
# 滚动到元素
await element.scroll_into_view_if_needed()
# 添加随机延迟
if delay:
await asyncio.sleep(delay)
else:
await asyncio.sleep(random.uniform(*self.random_delay_range))
# 点击下拉框
await element.click()
# 等待选项出现并选择
option_selector = f"{selector} option[value='{value}']"
await page.wait_for_selector(option_selector, timeout=5000)
await page.select_option(selector, value)
logger.debug(f"人类化选择完成: {selector} -> {value}")
return True
except Exception as e:
logger.error(f"人类化选择失败: {selector} - {e}")
return False
async def human_drag_and_drop(
self,
page: Page,
source_selector: str,
target_selector: str,
delay: Optional[float] = None
):
"""
模拟人类拖拽操作
Args:
page: Playwright页面对象
source_selector: 源元素选择器
target_selector: 目标元素选择器
delay: 操作前延迟
"""
try:
source_element = await page.wait_for_selector(source_selector, timeout=10000)
target_element = await page.wait_for_selector(target_selector, timeout=10000)
if not source_element or not target_element:
logger.warning(f"未找到拖拽元素: {source_selector} -> {target_selector}")
return False
# 获取元素位置
source_bbox = await source_element.bounding_box()
target_bbox = await target_element.bounding_box()
if not source_bbox or not target_bbox:
logger.error("无法获取元素位置信息")
return False
# 添加随机延迟
if delay:
await asyncio.sleep(delay)
else:
await asyncio.sleep(random.uniform(*self.random_delay_range))
# 开始拖拽
source_x = source_bbox['x'] + source_bbox['width'] / 2
source_y = source_bbox['y'] + source_bbox['height'] / 2
target_x = target_bbox['x'] + target_bbox['width'] / 2
target_y = target_bbox['y'] + target_bbox['height'] / 2
await page.mouse.move(source_x, source_y)
await page.mouse.down()
# 模拟拖拽路径(不是直线)
steps = 10
for i in range(1, steps + 1):
progress = i / steps
# 添加一些曲线变化
curve = math.sin(progress * math.pi) * 20
x = source_x + (target_x - source_x) * progress + random.uniform(-curve, curve)
y = source_y + (target_y - source_y) * progress + random.uniform(-curve, curve)
await page.mouse.move(x, y)
await asyncio.sleep(0.05)
await page.mouse.up()
logger.debug(f"人类化拖拽完成: {source_selector} -> {target_selector}")
return True
except Exception as e:
logger.error(f"人类化拖拽失败: {e}")
return False
async def random_wait(self, min_seconds: float = 1.0, max_seconds: float = 3.0):
"""
随机等待
Args:
min_seconds: 最小等待时间
max_seconds: 最大等待时间
"""
wait_time = random.uniform(min_seconds, max_seconds)
await asyncio.sleep(wait_time)
logger.debug(f"随机等待: {wait_time:.2f}")
async def simulate_reading(self, page: Page, reading_time: float = 5.0):
"""
模拟阅读行为
Args:
page: Playwright页面对象
reading_time: 阅读时间(秒)
"""
try:
# 在阅读期间进行轻微的滚动和鼠标移动
start_time = asyncio.get_event_loop().time()
while asyncio.get_event_loop().time() - start_time < reading_time:
# 轻微滚动
scroll_distance = random.randint(-50, 50)
await page.mouse.wheel(0, scroll_distance)
# 鼠标小幅移动
viewport = page.viewport_size
if viewport:
x = random.randint(100, viewport['width'] - 100)
y = random.randint(100, viewport['height'] - 100)
await page.mouse.move(x, y)
# 等待随机时间
await asyncio.sleep(random.uniform(0.5, 1.5))
logger.debug(f"模拟阅读完成: {reading_time}")
return True
except Exception as e:
logger.error(f"模拟阅读失败: {e}")
return False
async def random_mouse_movement(self, page: Page, duration: float = 2.0):
"""
随机鼠标移动
Args:
page: Playwright页面对象
duration: 移动持续时间(秒)
"""
try:
viewport = page.viewport_size
if not viewport:
return False
start_time = asyncio.get_event_loop().time()
current_x = viewport['width'] / 2
current_y = viewport['height'] / 2
while asyncio.get_event_loop().time() - start_time < duration:
# 生成随机目标位置
target_x = random.randint(50, viewport['width'] - 50)
target_y = random.randint(50, viewport['height'] - 50)
# 平滑移动到目标位置
steps = 20
for i in range(steps + 1):
progress = i / steps
x = current_x + (target_x - current_x) * progress
y = current_y + (target_y - current_y) * progress
await page.mouse.move(x, y)
await asyncio.sleep(0.03)
current_x, current_y = target_x, target_y
await asyncio.sleep(random.uniform(0.1, 0.3))
logger.debug(f"随机鼠标移动完成: {duration}")
return True
except Exception as e:
logger.error(f"随机鼠标移动失败: {e}")
return False
# 创建全局实例
human_behavior = HumanBehaviorSimulator()
# 导入math模块用于拖拽中的曲线计算
import math