autoUpload/utils/human_typing_wrapper.py

376 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import random
from playwright.sync_api import Page
from playwright.async_api import Page as AsyncPage
from .human_like import HumanLikeTyper
import asyncio
from typing import Union, Optional, Dict, Any
class HumanTypingWrapper:
"""
人类化输入包装器,提供适度的人类化输入功能
支持同步和异步页面操作,设计更加简洁高效
"""
def __init__(self, page: Union[Page, AsyncPage], config: Optional[Dict[str, Any]] = None):
self.page = page
self.is_async = hasattr(page, 'wait_for_timeout') # 检测是否为异步页面
# 初始化配置 - 使用更合理的默认值,简化设计
self.config = self._init_config(config)
# 初始化human_like模块仅用于同步页面
if not self.is_async:
self.human_typer = HumanLikeTyper(page)
def _init_config(self, config: Optional[Dict[str, Any]]) -> Dict[str, Any]:
"""初始化配置参数
采用行业标准的人类行为模拟设计:
- 保持合理的输入速度变化
- 适度的停顿和错误修正
- 简化的参数设置,更易维护
"""
default_config = {
# 基础输入速度设置(更符合真实人类输入速度范围)
'min_typing_speed': 2.0,
'max_typing_speed': 5.0,
# 适度的思考停顿设置
'pause_probability': 0.1, # 每10个字符左右停顿一次
'min_pause_duration': 0.2,
'max_pause_duration': 1.0,
# 简化的错误修正设置 - 不要过多干扰正常输入
'correction_probability': 0.05, # 5%的概率进行错误修正
# 基本的点击前后停顿
'click_delay_before': (0.2, 0.5),
'click_delay_after': (0.3, 0.7),
# 输入完成后的停顿
'finish_pause_probability': 0.6,
'finish_pause_duration': (0.5, 1.2),
# 分段输入设置 - 更合理的分段长度
'chunk_input': True,
'max_chunk_length': 50, # 更长的段落长度,更符合人类阅读习惯
'chunk_pause_duration': (0.8, 2.0),
# 最小延迟设置 - 确保不会有机器特征明显的快速输入
'min_char_delay': 0.08, # 确保字符间延迟不会过低
}
if config:
default_config.update(config)
return default_config
def _calculate_typing_delay(self) -> float:
"""简化的延迟计算方法
保持合理的随机变化,但避免过度复杂的计算
"""
# 基础延迟计算
base_speed = random.uniform(
self.config['min_typing_speed'],
self.config['max_typing_speed']
)
# 基础延迟(字符间)
base_delay = 1 / base_speed
# 增加一些随机变化,但保持在合理范围内
variation = random.uniform(0.8, 1.3)
final_delay = base_delay * variation
# 确保最小延迟,避免机器特征
final_delay = max(self.config['min_char_delay'], final_delay)
return final_delay
def _should_pause(self) -> bool:
"""简化的停顿判断逻辑"""
return random.random() < self.config['pause_probability']
def _should_correct(self) -> bool:
"""判断是否应该进行错误修正"""
return random.random() < self.config['correction_probability']
def _get_pause_duration(self) -> float:
"""获取随机停顿时长"""
return random.uniform(
self.config['min_pause_duration'],
self.config['max_pause_duration']
)
def _get_wrong_char(self, correct_char: str) -> str:
"""简化的错误字符生成 - 不需要完整的键盘映射表"""
# 只保留几个常见的错误类型,减少复杂性
if correct_char.isalpha():
# 简单的字母错误(前后移位)
char_code = ord(correct_char.lower())
if char_code > ord('a') and char_code < ord('z'):
# 随机前后移位
wrong_offset = random.choice([-1, 1])
wrong_char = chr(char_code + wrong_offset)
return wrong_char.upper() if correct_char.isupper() else wrong_char
# 默认返回一个简单的错误字符
return random.choice('abcdefghijklmnopqrstuvwxyz')
def _record_action(self, action_type: str):
"""空实现的_record_action方法保持兼容性"""
pass
async def _sleep(self, duration: float):
"""统一的睡眠方法,兼容同步和异步"""
if self.is_async:
await asyncio.sleep(duration)
else:
time.sleep(duration)
async def _type_char(self, char: str, delay: float):
"""简化的字符输入方法"""
if self.is_async:
await self.page.keyboard.type(char, delay=delay * 1000)
else:
self.page.keyboard.type(char, delay=delay * 1000)
# 简单的微小停顿,增加自然感
await self._sleep(random.uniform(0.02, 0.05))
async def _press_key(self, key: str):
"""按键操作,兼容同步和异步"""
if self.is_async:
await self.page.keyboard.press(key)
else:
self.page.keyboard.press(key)
# 记录按键动作
self._record_action(f"key_{key}")
async def _click_element(self, selector: str):
"""点击元素,兼容同步和异步"""
if self.is_async:
await self.page.click(selector)
else:
self.page.click(selector)
# 记录点击动作
self._record_action("click")
async def _wait_for_selector(self, selector: str, timeout: int = 30000):
"""等待选择器,兼容同步和异步"""
if self.is_async:
await self.page.wait_for_selector(selector, timeout=timeout)
else:
self.page.wait_for_selector(selector, timeout=timeout)
async def _perform_random_action(self):
"""简化的随机动作执行 - 不需要过多的动作类型"""
# 简单的微小停顿就足够模拟人类行为的不确定性
await self._sleep(random.uniform(0.1, 0.3))
async def type_text_human(self, selector: str, text: str, clear_first: bool = True) -> bool:
"""
以人类化方式在指定元素中输入文本
Args:
selector: 元素选择器
text: 要输入的文本
clear_first: 是否先清空现有内容
Returns:
bool: 是否成功输入
"""
try:
# 重置历史记录
self.last_delays = []
self.action_history = []
self.input_timestamps = []
# 等待元素并点击
await self._wait_for_selector(selector)
# 点击前停顿 - 增加随机性和持续时间
delay_before = random.uniform(*self.config['click_delay_before'])
await self._sleep(delay_before)
# 模拟光标移动到元素的过程
await self._sleep(random.uniform(0.1, 0.3))
# 点击元素
await self._click_element(selector)
# 点击后停顿 - 增加随机性和持续时间
delay_after = random.uniform(*self.config['click_delay_after'])
await self._sleep(delay_after)
# 清空现有内容 - 更自然的清空操作,使用更长的延迟
if clear_first:
# 增加随机延迟,使清空操作更自然
await self._sleep(random.uniform(0.2, 0.4))
await self._press_key("Control+A")
await self._sleep(random.uniform(0.15, 0.3))
await self._press_key("Delete")
await self._sleep(random.uniform(0.3, 0.6))
# 分段输入长文本
if self.config['chunk_input'] and len(text) > self.config['max_chunk_length']:
await self._type_text_in_chunks(text)
else:
await self._type_text_continuously(text)
# 输入完成后可能停顿 - 更自然的结束停顿
if random.random() < self.config['finish_pause_probability']:
pause_duration = random.uniform(*self.config['finish_pause_duration'])
await self._sleep(pause_duration)
# 输入完成后的最终停顿
await self._sleep(random.uniform(0.2, 0.5))
return True
except Exception as e:
print(f"人类化输入失败: {e}")
return False
async def _type_text_continuously(self, text: str):
"""简化的连续输入方法
保留必要的人类特征,但避免过度复杂化
"""
for char in text:
# 合理的停顿思考
if self._should_pause():
pause_duration = self._get_pause_duration()
await self._sleep(pause_duration)
# 适度的错误修正
if random.random() < self.config['correction_probability']:
# 简化的错误修正
wrong_char = self._get_wrong_char(char)
await self._type_char(wrong_char, self._calculate_typing_delay())
await self._sleep(random.uniform(0.1, 0.3)) # 发现错误后的短暂停顿
await self._press_key("Backspace") # 删除错误字符
await self._sleep(random.uniform(0.1, 0.2)) # 删除后的停顿
# 计算延迟并输入字符
delay = self._calculate_typing_delay()
await self._type_char(char, delay)
async def _type_text_in_chunks(self, text: str):
"""简化的分段输入方法
保持基本的段落分割逻辑,但简化实现"""
chunk_size = self.config['max_chunk_length']
# 简单的分段处理
for i in range(0, len(text), chunk_size):
# 获取当前段落,稍微随机化长度
actual_chunk_size = min(chunk_size, len(text) - i)
# 如果不是最后一段,尽量在空格处分段
if i + actual_chunk_size < len(text):
# 寻找段落内最近的空格
space_pos = text.rfind(' ', i, i + actual_chunk_size + 1)
if space_pos > i:
actual_chunk_size = space_pos - i + 1
# 输入当前段落
chunk = text[i:i + actual_chunk_size]
await self._type_text_continuously(chunk)
# 段落间停顿
if i + actual_chunk_size < len(text):
pause_duration = random.uniform(*self.config['chunk_pause_duration'])
await self._sleep(pause_duration)
# 移除了专门的_simul ate_correction和_simul ate_backspace_retype方法
# 将这些功能合并到_type_text_continuously中简化实现
async def click_and_type(self, selector: str, text: str, **kwargs) -> bool:
"""
点击元素并输入文本的便捷方法
Args:
selector: 元素选择器
text: 要输入的文本
**kwargs: 传递给type_text_human的额外参数
Returns:
bool: 是否成功
"""
return await self.type_text_human(selector, text, **kwargs)
async def human_click(self, selector: str, wait_after: bool = True) -> bool:
"""
人类化点击元素
Args:
selector: 元素选择器
wait_after: 点击后是否等待
Returns:
bool: 是否成功点击
"""
try:
await self._wait_for_selector(selector)
# 点击前停顿 - 增加随机性
delay_before = random.uniform(*self.config['click_delay_before'])
await self._sleep(delay_before)
# 点击
await self._click_element(selector)
# 点击后停顿
if wait_after:
delay_after = random.uniform(*self.config['click_delay_after'])
await self._sleep(delay_after)
return True
except Exception as e:
print(f"人类化点击失败: {e}")
return False
async def human_scroll(self, direction: str = "down", amount: int = 3):
"""
人类化滚动页面
Args:
direction: 滚动方向 "up""down"
amount: 滚动次数
"""
key = "PageDown" if direction == "down" else "PageUp"
for i in range(amount):
await self._press_key(key)
# 滚动间隔 - 增加随机性和一致性
pause = random.uniform(0.5, 1.2) # 增加滚动间隔
await self._sleep(pause)
# 记录滚动动作
self._record_action(f"scroll_{direction}")
def update_config(self, new_config: Dict[str, Any]):
"""更新配置"""
self.config.update(new_config)
def get_config(self) -> Dict[str, Any]:
"""获取当前配置"""
return self.config.copy()
# 便捷的工厂函数
def create_human_typer(page: Union[Page, AsyncPage], config: Optional[Dict[str, Any]] = None) -> HumanTypingWrapper:
"""
创建人类化输入包装器的工厂函数
Args:
page: Playwright页面对象同步或异步
config: 可选的配置字典
Returns:
HumanTypingWrapper实例
"""
return HumanTypingWrapper(page, config)