363 lines
13 KiB
Python
363 lines
13 KiB
Python
import time
|
||
import random
|
||
from playwright.sync_api import Page
|
||
from playwright.async_api import Page as AsyncPage
|
||
from .human_like import HumanLikeTyper
|
||
import asyncio
|
||
from typing import Union, Optional, List, Dict, Any
|
||
|
||
|
||
class HumanTypingWrapper:
|
||
"""
|
||
人类化输入包装器,提供更高级的人类化输入功能
|
||
支持同步和异步页面操作
|
||
"""
|
||
|
||
def __init__(self, page: Union[Page, AsyncPage], config: Optional[Dict[str, Any]] = None):
|
||
self.page = page
|
||
self.is_async = hasattr(page, 'wait_for_timeout') # 检测是否为异步页面
|
||
|
||
# 初始化配置
|
||
self.config = self._init_config(config)
|
||
|
||
# 初始化human_like模块(仅用于同步页面)
|
||
if not self.is_async:
|
||
self.human_typer = HumanLikeTyper(page)
|
||
|
||
def _init_config(self, config: Optional[Dict[str, Any]]) -> Dict[str, Any]:
|
||
"""初始化配置参数"""
|
||
default_config = {
|
||
# 输入速度设置(字符/秒)
|
||
'min_typing_speed': 2,
|
||
'max_typing_speed': 10,
|
||
|
||
# 思考停顿设置
|
||
'pause_probability': 0.15, # 15%的概率停顿
|
||
'min_pause_duration': 0.3,
|
||
'max_pause_duration': 2.5,
|
||
|
||
# 错误修正设置
|
||
'correction_probability': 0.0, # 禁用错误修正,确保文字准确性
|
||
'backspace_probability': 0.0, # 禁用退格重输,确保文字准确性
|
||
|
||
# 点击前后停顿
|
||
'click_delay_before': (0.1, 0.4),
|
||
'click_delay_after': (0.2, 0.6),
|
||
|
||
# 输入完成后停顿
|
||
'finish_pause_probability': 0.4, # 40%的概率在输入完成后停顿
|
||
'finish_pause_duration': (0.3, 1.2),
|
||
|
||
# 分段输入设置(长文本分段输入)
|
||
'chunk_input': True,
|
||
'max_chunk_length': 50,
|
||
'chunk_pause_duration': (0.5, 1.5),
|
||
|
||
# 模拟疲劳效果
|
||
'fatigue_effect': True,
|
||
'fatigue_threshold': 100, # 输入超过100个字符后开始疲劳
|
||
'fatigue_slowdown': 0.3, # 疲劳后速度减慢30%
|
||
}
|
||
|
||
if config:
|
||
default_config.update(config)
|
||
|
||
return default_config
|
||
|
||
def _calculate_typing_delay(self, char_index: int = 0) -> float:
|
||
"""计算每个字符之间的延迟时间,考虑疲劳效果"""
|
||
base_speed = random.uniform(
|
||
self.config['min_typing_speed'],
|
||
self.config['max_typing_speed']
|
||
)
|
||
|
||
# 计算疲劳效果
|
||
if (self.config['fatigue_effect'] and
|
||
char_index > self.config['fatigue_threshold']):
|
||
fatigue_factor = 1 + self.config['fatigue_slowdown']
|
||
base_speed /= fatigue_factor
|
||
|
||
delay = 1 / base_speed + random.uniform(-0.05, 0.15)
|
||
return max(0.01, delay) # 确保延迟不为负数
|
||
|
||
def _should_pause(self) -> bool:
|
||
"""判断是否应该停顿思考"""
|
||
return random.random() < self.config['pause_probability']
|
||
|
||
def _should_correct(self) -> bool:
|
||
"""判断是否应该进行错误修正"""
|
||
return random.random() < self.config['correction_probability']
|
||
|
||
def _should_backspace(self) -> bool:
|
||
"""判断是否应该退格重新输入"""
|
||
return random.random() < self.config['backspace_probability']
|
||
|
||
def _get_random_pause_duration(self) -> float:
|
||
"""获取随机停顿时长"""
|
||
return random.uniform(
|
||
self.config['min_pause_duration'],
|
||
self.config['max_pause_duration']
|
||
)
|
||
|
||
def _get_wrong_char(self, correct_char: str) -> str:
|
||
"""生成错误字符"""
|
||
# 键盘相邻字符映射
|
||
keyboard_neighbors = {
|
||
'a': 'sqw', 'b': 'vghn', 'c': 'xdfv', 'd': 'erfcxs', 'e': 'wsdr',
|
||
'f': 'rtgvcd', 'g': 'tyhbvf', 'h': 'yujnbg', 'i': 'ujko', 'j': 'uikmnh',
|
||
'k': 'iolmj', 'l': 'opmk', 'm': 'njk', 'n': 'bhjm', 'o': 'iklp',
|
||
'p': 'ol', 'q': 'wa', 'r': 'edft', 's': 'awedxz', 't': 'rfgy',
|
||
'u': 'yhij', 'v': 'cfgb', 'w': 'qase', 'x': 'zsdc', 'y': 'tghu',
|
||
'z': 'asx'
|
||
}
|
||
|
||
char_lower = correct_char.lower()
|
||
if char_lower in keyboard_neighbors:
|
||
neighbors = keyboard_neighbors[char_lower]
|
||
wrong_char = random.choice(neighbors)
|
||
# 保持原始大小写
|
||
return wrong_char.upper() if correct_char.isupper() else wrong_char
|
||
else:
|
||
# 如果不在映射中,返回随机字符
|
||
return random.choice('abcdefghijklmnopqrstuvwxyz')
|
||
|
||
async def _sleep(self, duration: float):
|
||
"""统一的睡眠方法,兼容同步和异步"""
|
||
if self.is_async:
|
||
await asyncio.sleep(duration)
|
||
else:
|
||
time.sleep(duration)
|
||
|
||
async def _type_char(self, char: str, delay: float):
|
||
"""输入单个字符,兼容同步和异步"""
|
||
if self.is_async:
|
||
await self.page.keyboard.type(char, delay=delay * 1000)
|
||
else:
|
||
self.page.keyboard.type(char, delay=delay * 1000)
|
||
|
||
async def _press_key(self, key: str):
|
||
"""按键操作,兼容同步和异步"""
|
||
if self.is_async:
|
||
await self.page.keyboard.press(key)
|
||
else:
|
||
self.page.keyboard.press(key)
|
||
|
||
async def _click_element(self, selector: str):
|
||
"""点击元素,兼容同步和异步"""
|
||
if self.is_async:
|
||
await self.page.click(selector)
|
||
else:
|
||
self.page.click(selector)
|
||
|
||
async def _wait_for_selector(self, selector: str, timeout: int = 30000):
|
||
"""等待选择器,兼容同步和异步"""
|
||
if self.is_async:
|
||
await self.page.wait_for_selector(selector, timeout=timeout)
|
||
else:
|
||
self.page.wait_for_selector(selector, timeout=timeout)
|
||
|
||
async def type_text_human(self, selector: str, text: str, clear_first: bool = True) -> bool:
|
||
"""
|
||
以人类化方式在指定元素中输入文本
|
||
|
||
Args:
|
||
selector: 元素选择器
|
||
text: 要输入的文本
|
||
clear_first: 是否先清空现有内容
|
||
|
||
Returns:
|
||
bool: 是否成功输入
|
||
"""
|
||
try:
|
||
# 等待元素并点击
|
||
await self._wait_for_selector(selector)
|
||
|
||
# 点击前停顿
|
||
delay_before = random.uniform(*self.config['click_delay_before'])
|
||
await self._sleep(delay_before)
|
||
|
||
# 点击元素
|
||
await self._click_element(selector)
|
||
|
||
# 点击后停顿
|
||
delay_after = random.uniform(*self.config['click_delay_after'])
|
||
await self._sleep(delay_after)
|
||
|
||
# 清空现有内容
|
||
if clear_first:
|
||
await self._press_key("Control+A")
|
||
await self._sleep(0.1)
|
||
await self._press_key("Delete")
|
||
await self._sleep(0.2)
|
||
|
||
# 分段输入长文本
|
||
if self.config['chunk_input'] and len(text) > self.config['max_chunk_length']:
|
||
await self._type_text_in_chunks(text)
|
||
else:
|
||
await self._type_text_continuously(text)
|
||
|
||
# 输入完成后可能停顿
|
||
if random.random() < self.config['finish_pause_probability']:
|
||
pause_duration = random.uniform(*self.config['finish_pause_duration'])
|
||
await self._sleep(pause_duration)
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"人类化输入失败: {e}")
|
||
return False
|
||
|
||
async def _type_text_continuously(self, text: str):
|
||
"""连续输入文本,包含人类化效果"""
|
||
current_input = ""
|
||
|
||
for i, char in enumerate(text):
|
||
# 随机停顿思考
|
||
if self._should_pause():
|
||
pause_duration = self._get_random_pause_duration()
|
||
await self._sleep(pause_duration)
|
||
|
||
# 随机错误修正
|
||
if current_input and self._should_correct():
|
||
await self._simulate_correction(char)
|
||
|
||
# 随机退格重新输入
|
||
if current_input and self._should_backspace():
|
||
await self._simulate_backspace_retype(char)
|
||
|
||
# 计算延迟并输入字符
|
||
delay = self._calculate_typing_delay(i)
|
||
await self._type_char(char, delay)
|
||
|
||
current_input += char
|
||
|
||
# 字符间微小停顿
|
||
micro_pause = random.uniform(0.01, 0.08)
|
||
await self._sleep(micro_pause)
|
||
|
||
async def _type_text_in_chunks(self, text: str):
|
||
"""分段输入长文本"""
|
||
chunk_size = self.config['max_chunk_length']
|
||
chunks = [text[i:i+chunk_size] for i in range(0, len(text), chunk_size)]
|
||
|
||
for i, chunk in enumerate(chunks):
|
||
await self._type_text_continuously(chunk)
|
||
|
||
# 段落间停顿(除了最后一段)
|
||
if i < len(chunks) - 1:
|
||
pause_duration = random.uniform(*self.config['chunk_pause_duration'])
|
||
await self._sleep(pause_duration)
|
||
|
||
async def _simulate_correction(self, next_char: str):
|
||
"""模拟输入错误并修正"""
|
||
# 删除最后一个字符
|
||
await self._press_key("Backspace")
|
||
await self._sleep(random.uniform(0.1, 0.4))
|
||
|
||
# 输入错误字符
|
||
wrong_char = self._get_wrong_char(next_char)
|
||
delay = self._calculate_typing_delay()
|
||
await self._type_char(wrong_char, delay)
|
||
await self._sleep(random.uniform(0.2, 0.5))
|
||
|
||
# 删除错误字符
|
||
await self._press_key("Backspace")
|
||
await self._sleep(random.uniform(0.1, 0.3))
|
||
|
||
async def _simulate_backspace_retype(self, target_char: str):
|
||
"""模拟退格后重新输入"""
|
||
backspace_count = random.randint(1, min(3, len(target_char)))
|
||
|
||
# 执行退格
|
||
for _ in range(backspace_count):
|
||
await self._press_key("Backspace")
|
||
await self._sleep(random.uniform(0.1, 0.2))
|
||
|
||
# 短暂停顿
|
||
await self._sleep(random.uniform(0.2, 0.6))
|
||
|
||
async def click_and_type(self, selector: str, text: str, **kwargs) -> bool:
|
||
"""
|
||
点击元素并输入文本的便捷方法
|
||
|
||
Args:
|
||
selector: 元素选择器
|
||
text: 要输入的文本
|
||
**kwargs: 传递给type_text_human的额外参数
|
||
|
||
Returns:
|
||
bool: 是否成功
|
||
"""
|
||
return await self.type_text_human(selector, text, **kwargs)
|
||
|
||
async def human_click(self, selector: str, wait_after: bool = True) -> bool:
|
||
"""
|
||
人类化点击元素
|
||
|
||
Args:
|
||
selector: 元素选择器
|
||
wait_after: 点击后是否等待
|
||
|
||
Returns:
|
||
bool: 是否成功点击
|
||
"""
|
||
try:
|
||
await self._wait_for_selector(selector)
|
||
|
||
# 点击前停顿
|
||
delay_before = random.uniform(*self.config['click_delay_before'])
|
||
await self._sleep(delay_before)
|
||
|
||
# 点击
|
||
await self._click_element(selector)
|
||
|
||
# 点击后停顿
|
||
if wait_after:
|
||
delay_after = random.uniform(*self.config['click_delay_after'])
|
||
await self._sleep(delay_after)
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"人类化点击失败: {e}")
|
||
return False
|
||
|
||
async def human_scroll(self, direction: str = "down", amount: int = 3):
|
||
"""
|
||
人类化滚动页面
|
||
|
||
Args:
|
||
direction: 滚动方向 "up" 或 "down"
|
||
amount: 滚动次数
|
||
"""
|
||
key = "PageDown" if direction == "down" else "PageUp"
|
||
|
||
for i in range(amount):
|
||
await self._press_key(key)
|
||
# 滚动间隔
|
||
pause = random.uniform(0.3, 0.8)
|
||
await self._sleep(pause)
|
||
|
||
def update_config(self, new_config: Dict[str, Any]):
|
||
"""更新配置"""
|
||
self.config.update(new_config)
|
||
|
||
def get_config(self) -> Dict[str, Any]:
|
||
"""获取当前配置"""
|
||
return self.config.copy()
|
||
|
||
|
||
# 便捷的工厂函数
|
||
def create_human_typer(page: Union[Page, AsyncPage], config: Optional[Dict[str, Any]] = None) -> HumanTypingWrapper:
|
||
"""
|
||
创建人类化输入包装器的工厂函数
|
||
|
||
Args:
|
||
page: Playwright页面对象(同步或异步)
|
||
config: 可选的配置字典
|
||
|
||
Returns:
|
||
HumanTypingWrapper实例
|
||
"""
|
||
return HumanTypingWrapper(page, config)
|