376 lines
14 KiB
Python
376 lines
14 KiB
Python
import time
|
||
import random
|
||
from playwright.sync_api import Page
|
||
from playwright.async_api import Page as AsyncPage
|
||
from .human_like import HumanLikeTyper
|
||
import asyncio
|
||
from typing import Union, Optional, Dict, Any
|
||
|
||
|
||
class HumanTypingWrapper:
|
||
"""
|
||
人类化输入包装器,提供适度的人类化输入功能
|
||
支持同步和异步页面操作,设计更加简洁高效
|
||
"""
|
||
|
||
def __init__(self, page: Union[Page, AsyncPage], config: Optional[Dict[str, Any]] = None):
|
||
self.page = page
|
||
self.is_async = hasattr(page, 'wait_for_timeout') # 检测是否为异步页面
|
||
|
||
# 初始化配置 - 使用更合理的默认值,简化设计
|
||
self.config = self._init_config(config)
|
||
|
||
# 初始化human_like模块(仅用于同步页面)
|
||
if not self.is_async:
|
||
self.human_typer = HumanLikeTyper(page)
|
||
|
||
def _init_config(self, config: Optional[Dict[str, Any]]) -> Dict[str, Any]:
|
||
"""初始化配置参数
|
||
采用行业标准的人类行为模拟设计:
|
||
- 保持合理的输入速度变化
|
||
- 适度的停顿和错误修正
|
||
- 简化的参数设置,更易维护
|
||
"""
|
||
default_config = {
|
||
# 基础输入速度设置(更符合真实人类输入速度范围)
|
||
'min_typing_speed': 2.0,
|
||
'max_typing_speed': 5.0,
|
||
|
||
# 适度的思考停顿设置
|
||
'pause_probability': 0.1, # 每10个字符左右停顿一次
|
||
'min_pause_duration': 0.2,
|
||
'max_pause_duration': 1.0,
|
||
|
||
# 简化的错误修正设置 - 不要过多干扰正常输入
|
||
'correction_probability': 0.05, # 5%的概率进行错误修正
|
||
|
||
# 基本的点击前后停顿
|
||
'click_delay_before': (0.2, 0.5),
|
||
'click_delay_after': (0.3, 0.7),
|
||
|
||
# 输入完成后的停顿
|
||
'finish_pause_probability': 0.6,
|
||
'finish_pause_duration': (0.5, 1.2),
|
||
|
||
# 分段输入设置 - 更合理的分段长度
|
||
'chunk_input': True,
|
||
'max_chunk_length': 50, # 更长的段落长度,更符合人类阅读习惯
|
||
'chunk_pause_duration': (0.8, 2.0),
|
||
|
||
# 最小延迟设置 - 确保不会有机器特征明显的快速输入
|
||
'min_char_delay': 0.08, # 确保字符间延迟不会过低
|
||
}
|
||
|
||
if config:
|
||
default_config.update(config)
|
||
|
||
return default_config
|
||
|
||
def _calculate_typing_delay(self) -> float:
|
||
"""简化的延迟计算方法
|
||
保持合理的随机变化,但避免过度复杂的计算
|
||
"""
|
||
# 基础延迟计算
|
||
base_speed = random.uniform(
|
||
self.config['min_typing_speed'],
|
||
self.config['max_typing_speed']
|
||
)
|
||
|
||
# 基础延迟(字符间)
|
||
base_delay = 1 / base_speed
|
||
|
||
# 增加一些随机变化,但保持在合理范围内
|
||
variation = random.uniform(0.8, 1.3)
|
||
final_delay = base_delay * variation
|
||
|
||
# 确保最小延迟,避免机器特征
|
||
final_delay = max(self.config['min_char_delay'], final_delay)
|
||
|
||
return final_delay
|
||
|
||
def _should_pause(self) -> bool:
|
||
"""简化的停顿判断逻辑"""
|
||
return random.random() < self.config['pause_probability']
|
||
|
||
def _should_correct(self) -> bool:
|
||
"""判断是否应该进行错误修正"""
|
||
return random.random() < self.config['correction_probability']
|
||
|
||
def _get_pause_duration(self) -> float:
|
||
"""获取随机停顿时长"""
|
||
return random.uniform(
|
||
self.config['min_pause_duration'],
|
||
self.config['max_pause_duration']
|
||
)
|
||
|
||
def _get_wrong_char(self, correct_char: str) -> str:
|
||
"""简化的错误字符生成 - 不需要完整的键盘映射表"""
|
||
# 只保留几个常见的错误类型,减少复杂性
|
||
if correct_char.isalpha():
|
||
# 简单的字母错误(前后移位)
|
||
char_code = ord(correct_char.lower())
|
||
if char_code > ord('a') and char_code < ord('z'):
|
||
# 随机前后移位
|
||
wrong_offset = random.choice([-1, 1])
|
||
wrong_char = chr(char_code + wrong_offset)
|
||
return wrong_char.upper() if correct_char.isupper() else wrong_char
|
||
# 默认返回一个简单的错误字符
|
||
return random.choice('abcdefghijklmnopqrstuvwxyz')
|
||
|
||
def _record_action(self, action_type: str):
|
||
"""空实现的_record_action方法,保持兼容性"""
|
||
pass
|
||
|
||
async def _sleep(self, duration: float):
|
||
"""统一的睡眠方法,兼容同步和异步"""
|
||
if self.is_async:
|
||
await asyncio.sleep(duration)
|
||
else:
|
||
time.sleep(duration)
|
||
|
||
async def _type_char(self, char: str, delay: float):
|
||
"""简化的字符输入方法"""
|
||
if self.is_async:
|
||
await self.page.keyboard.type(char, delay=delay * 1000)
|
||
else:
|
||
self.page.keyboard.type(char, delay=delay * 1000)
|
||
|
||
# 简单的微小停顿,增加自然感
|
||
await self._sleep(random.uniform(0.02, 0.05))
|
||
|
||
async def _press_key(self, key: str):
|
||
"""按键操作,兼容同步和异步"""
|
||
if self.is_async:
|
||
await self.page.keyboard.press(key)
|
||
else:
|
||
self.page.keyboard.press(key)
|
||
|
||
# 记录按键动作
|
||
self._record_action(f"key_{key}")
|
||
|
||
async def _click_element(self, selector: str):
|
||
"""点击元素,兼容同步和异步"""
|
||
if self.is_async:
|
||
await self.page.click(selector)
|
||
else:
|
||
self.page.click(selector)
|
||
|
||
# 记录点击动作
|
||
self._record_action("click")
|
||
|
||
async def _wait_for_selector(self, selector: str, timeout: int = 30000):
|
||
"""等待选择器,兼容同步和异步"""
|
||
if self.is_async:
|
||
await self.page.wait_for_selector(selector, timeout=timeout)
|
||
else:
|
||
self.page.wait_for_selector(selector, timeout=timeout)
|
||
|
||
async def _perform_random_action(self):
|
||
"""简化的随机动作执行 - 不需要过多的动作类型"""
|
||
# 简单的微小停顿就足够模拟人类行为的不确定性
|
||
await self._sleep(random.uniform(0.1, 0.3))
|
||
|
||
async def type_text_human(self, selector: str, text: str, clear_first: bool = True) -> bool:
|
||
"""
|
||
以人类化方式在指定元素中输入文本
|
||
|
||
Args:
|
||
selector: 元素选择器
|
||
text: 要输入的文本
|
||
clear_first: 是否先清空现有内容
|
||
|
||
Returns:
|
||
bool: 是否成功输入
|
||
"""
|
||
try:
|
||
# 重置历史记录
|
||
self.last_delays = []
|
||
self.action_history = []
|
||
self.input_timestamps = []
|
||
|
||
# 等待元素并点击
|
||
await self._wait_for_selector(selector)
|
||
|
||
# 点击前停顿 - 增加随机性和持续时间
|
||
delay_before = random.uniform(*self.config['click_delay_before'])
|
||
await self._sleep(delay_before)
|
||
|
||
# 模拟光标移动到元素的过程
|
||
await self._sleep(random.uniform(0.1, 0.3))
|
||
|
||
# 点击元素
|
||
await self._click_element(selector)
|
||
|
||
# 点击后停顿 - 增加随机性和持续时间
|
||
delay_after = random.uniform(*self.config['click_delay_after'])
|
||
await self._sleep(delay_after)
|
||
|
||
# 清空现有内容 - 更自然的清空操作,使用更长的延迟
|
||
if clear_first:
|
||
# 增加随机延迟,使清空操作更自然
|
||
await self._sleep(random.uniform(0.2, 0.4))
|
||
await self._press_key("Control+A")
|
||
await self._sleep(random.uniform(0.15, 0.3))
|
||
await self._press_key("Delete")
|
||
await self._sleep(random.uniform(0.3, 0.6))
|
||
|
||
# 分段输入长文本
|
||
if self.config['chunk_input'] and len(text) > self.config['max_chunk_length']:
|
||
await self._type_text_in_chunks(text)
|
||
else:
|
||
await self._type_text_continuously(text)
|
||
|
||
# 输入完成后可能停顿 - 更自然的结束停顿
|
||
if random.random() < self.config['finish_pause_probability']:
|
||
pause_duration = random.uniform(*self.config['finish_pause_duration'])
|
||
await self._sleep(pause_duration)
|
||
|
||
# 输入完成后的最终停顿
|
||
await self._sleep(random.uniform(0.2, 0.5))
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"人类化输入失败: {e}")
|
||
return False
|
||
|
||
async def _type_text_continuously(self, text: str):
|
||
"""简化的连续输入方法
|
||
保留必要的人类特征,但避免过度复杂化
|
||
"""
|
||
for char in text:
|
||
# 合理的停顿思考
|
||
if self._should_pause():
|
||
pause_duration = self._get_pause_duration()
|
||
await self._sleep(pause_duration)
|
||
|
||
# 适度的错误修正
|
||
if random.random() < self.config['correction_probability']:
|
||
# 简化的错误修正
|
||
wrong_char = self._get_wrong_char(char)
|
||
await self._type_char(wrong_char, self._calculate_typing_delay())
|
||
await self._sleep(random.uniform(0.1, 0.3)) # 发现错误后的短暂停顿
|
||
await self._press_key("Backspace") # 删除错误字符
|
||
await self._sleep(random.uniform(0.1, 0.2)) # 删除后的停顿
|
||
|
||
# 计算延迟并输入字符
|
||
delay = self._calculate_typing_delay()
|
||
await self._type_char(char, delay)
|
||
|
||
async def _type_text_in_chunks(self, text: str):
|
||
"""简化的分段输入方法
|
||
保持基本的段落分割逻辑,但简化实现"""
|
||
chunk_size = self.config['max_chunk_length']
|
||
|
||
# 简单的分段处理
|
||
for i in range(0, len(text), chunk_size):
|
||
# 获取当前段落,稍微随机化长度
|
||
actual_chunk_size = min(chunk_size, len(text) - i)
|
||
|
||
# 如果不是最后一段,尽量在空格处分段
|
||
if i + actual_chunk_size < len(text):
|
||
# 寻找段落内最近的空格
|
||
space_pos = text.rfind(' ', i, i + actual_chunk_size + 1)
|
||
if space_pos > i:
|
||
actual_chunk_size = space_pos - i + 1
|
||
|
||
# 输入当前段落
|
||
chunk = text[i:i + actual_chunk_size]
|
||
await self._type_text_continuously(chunk)
|
||
|
||
# 段落间停顿
|
||
if i + actual_chunk_size < len(text):
|
||
pause_duration = random.uniform(*self.config['chunk_pause_duration'])
|
||
await self._sleep(pause_duration)
|
||
|
||
# 移除了专门的_simul ate_correction和_simul ate_backspace_retype方法
|
||
# 将这些功能合并到_type_text_continuously中,简化实现
|
||
|
||
async def click_and_type(self, selector: str, text: str, **kwargs) -> bool:
|
||
"""
|
||
点击元素并输入文本的便捷方法
|
||
|
||
Args:
|
||
selector: 元素选择器
|
||
text: 要输入的文本
|
||
**kwargs: 传递给type_text_human的额外参数
|
||
|
||
Returns:
|
||
bool: 是否成功
|
||
"""
|
||
return await self.type_text_human(selector, text, **kwargs)
|
||
|
||
async def human_click(self, selector: str, wait_after: bool = True) -> bool:
|
||
"""
|
||
人类化点击元素
|
||
|
||
Args:
|
||
selector: 元素选择器
|
||
wait_after: 点击后是否等待
|
||
|
||
Returns:
|
||
bool: 是否成功点击
|
||
"""
|
||
try:
|
||
await self._wait_for_selector(selector)
|
||
|
||
# 点击前停顿 - 增加随机性
|
||
delay_before = random.uniform(*self.config['click_delay_before'])
|
||
await self._sleep(delay_before)
|
||
|
||
# 点击
|
||
await self._click_element(selector)
|
||
|
||
# 点击后停顿
|
||
if wait_after:
|
||
delay_after = random.uniform(*self.config['click_delay_after'])
|
||
await self._sleep(delay_after)
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"人类化点击失败: {e}")
|
||
return False
|
||
|
||
async def human_scroll(self, direction: str = "down", amount: int = 3):
|
||
"""
|
||
人类化滚动页面
|
||
|
||
Args:
|
||
direction: 滚动方向 "up" 或 "down"
|
||
amount: 滚动次数
|
||
"""
|
||
key = "PageDown" if direction == "down" else "PageUp"
|
||
|
||
for i in range(amount):
|
||
await self._press_key(key)
|
||
# 滚动间隔 - 增加随机性和一致性
|
||
pause = random.uniform(0.5, 1.2) # 增加滚动间隔
|
||
await self._sleep(pause)
|
||
|
||
# 记录滚动动作
|
||
self._record_action(f"scroll_{direction}")
|
||
|
||
def update_config(self, new_config: Dict[str, Any]):
|
||
"""更新配置"""
|
||
self.config.update(new_config)
|
||
|
||
def get_config(self) -> Dict[str, Any]:
|
||
"""获取当前配置"""
|
||
return self.config.copy()
|
||
|
||
|
||
# 便捷的工厂函数
|
||
def create_human_typer(page: Union[Page, AsyncPage], config: Optional[Dict[str, Any]] = None) -> HumanTypingWrapper:
|
||
"""
|
||
创建人类化输入包装器的工厂函数
|
||
|
||
Args:
|
||
page: Playwright页面对象(同步或异步)
|
||
config: 可选的配置字典
|
||
|
||
Returns:
|
||
HumanTypingWrapper实例
|
||
"""
|
||
return HumanTypingWrapper(page, config)
|