autoUpload/utils/human_typing_wrapper.py

363 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import random
from playwright.sync_api import Page
from playwright.async_api import Page as AsyncPage
from .human_like import HumanLikeTyper
import asyncio
from typing import Union, Optional, List, Dict, Any
class HumanTypingWrapper:
"""
人类化输入包装器,提供更高级的人类化输入功能
支持同步和异步页面操作
"""
def __init__(self, page: Union[Page, AsyncPage], config: Optional[Dict[str, Any]] = None):
self.page = page
self.is_async = hasattr(page, 'wait_for_timeout') # 检测是否为异步页面
# 初始化配置
self.config = self._init_config(config)
# 初始化human_like模块仅用于同步页面
if not self.is_async:
self.human_typer = HumanLikeTyper(page)
def _init_config(self, config: Optional[Dict[str, Any]]) -> Dict[str, Any]:
"""初始化配置参数"""
default_config = {
# 输入速度设置(字符/秒)
'min_typing_speed': 2,
'max_typing_speed': 10,
# 思考停顿设置
'pause_probability': 0.15, # 15%的概率停顿
'min_pause_duration': 0.3,
'max_pause_duration': 2.5,
# 错误修正设置
'correction_probability': 0.0, # 禁用错误修正,确保文字准确性
'backspace_probability': 0.0, # 禁用退格重输,确保文字准确性
# 点击前后停顿
'click_delay_before': (0.1, 0.4),
'click_delay_after': (0.2, 0.6),
# 输入完成后停顿
'finish_pause_probability': 0.4, # 40%的概率在输入完成后停顿
'finish_pause_duration': (0.3, 1.2),
# 分段输入设置(长文本分段输入)
'chunk_input': True,
'max_chunk_length': 50,
'chunk_pause_duration': (0.5, 1.5),
# 模拟疲劳效果
'fatigue_effect': True,
'fatigue_threshold': 100, # 输入超过100个字符后开始疲劳
'fatigue_slowdown': 0.3, # 疲劳后速度减慢30%
}
if config:
default_config.update(config)
return default_config
def _calculate_typing_delay(self, char_index: int = 0) -> float:
"""计算每个字符之间的延迟时间,考虑疲劳效果"""
base_speed = random.uniform(
self.config['min_typing_speed'],
self.config['max_typing_speed']
)
# 计算疲劳效果
if (self.config['fatigue_effect'] and
char_index > self.config['fatigue_threshold']):
fatigue_factor = 1 + self.config['fatigue_slowdown']
base_speed /= fatigue_factor
delay = 1 / base_speed + random.uniform(-0.05, 0.15)
return max(0.01, delay) # 确保延迟不为负数
def _should_pause(self) -> bool:
"""判断是否应该停顿思考"""
return random.random() < self.config['pause_probability']
def _should_correct(self) -> bool:
"""判断是否应该进行错误修正"""
return random.random() < self.config['correction_probability']
def _should_backspace(self) -> bool:
"""判断是否应该退格重新输入"""
return random.random() < self.config['backspace_probability']
def _get_random_pause_duration(self) -> float:
"""获取随机停顿时长"""
return random.uniform(
self.config['min_pause_duration'],
self.config['max_pause_duration']
)
def _get_wrong_char(self, correct_char: str) -> str:
"""生成错误字符"""
# 键盘相邻字符映射
keyboard_neighbors = {
'a': 'sqw', 'b': 'vghn', 'c': 'xdfv', 'd': 'erfcxs', 'e': 'wsdr',
'f': 'rtgvcd', 'g': 'tyhbvf', 'h': 'yujnbg', 'i': 'ujko', 'j': 'uikmnh',
'k': 'iolmj', 'l': 'opmk', 'm': 'njk', 'n': 'bhjm', 'o': 'iklp',
'p': 'ol', 'q': 'wa', 'r': 'edft', 's': 'awedxz', 't': 'rfgy',
'u': 'yhij', 'v': 'cfgb', 'w': 'qase', 'x': 'zsdc', 'y': 'tghu',
'z': 'asx'
}
char_lower = correct_char.lower()
if char_lower in keyboard_neighbors:
neighbors = keyboard_neighbors[char_lower]
wrong_char = random.choice(neighbors)
# 保持原始大小写
return wrong_char.upper() if correct_char.isupper() else wrong_char
else:
# 如果不在映射中,返回随机字符
return random.choice('abcdefghijklmnopqrstuvwxyz')
async def _sleep(self, duration: float):
"""统一的睡眠方法,兼容同步和异步"""
if self.is_async:
await asyncio.sleep(duration)
else:
time.sleep(duration)
async def _type_char(self, char: str, delay: float):
"""输入单个字符,兼容同步和异步"""
if self.is_async:
await self.page.keyboard.type(char, delay=delay * 1000)
else:
self.page.keyboard.type(char, delay=delay * 1000)
async def _press_key(self, key: str):
"""按键操作,兼容同步和异步"""
if self.is_async:
await self.page.keyboard.press(key)
else:
self.page.keyboard.press(key)
async def _click_element(self, selector: str):
"""点击元素,兼容同步和异步"""
if self.is_async:
await self.page.click(selector)
else:
self.page.click(selector)
async def _wait_for_selector(self, selector: str, timeout: int = 30000):
"""等待选择器,兼容同步和异步"""
if self.is_async:
await self.page.wait_for_selector(selector, timeout=timeout)
else:
self.page.wait_for_selector(selector, timeout=timeout)
async def type_text_human(self, selector: str, text: str, clear_first: bool = True) -> bool:
"""
以人类化方式在指定元素中输入文本
Args:
selector: 元素选择器
text: 要输入的文本
clear_first: 是否先清空现有内容
Returns:
bool: 是否成功输入
"""
try:
# 等待元素并点击
await self._wait_for_selector(selector)
# 点击前停顿
delay_before = random.uniform(*self.config['click_delay_before'])
await self._sleep(delay_before)
# 点击元素
await self._click_element(selector)
# 点击后停顿
delay_after = random.uniform(*self.config['click_delay_after'])
await self._sleep(delay_after)
# 清空现有内容
if clear_first:
await self._press_key("Control+A")
await self._sleep(0.1)
await self._press_key("Delete")
await self._sleep(0.2)
# 分段输入长文本
if self.config['chunk_input'] and len(text) > self.config['max_chunk_length']:
await self._type_text_in_chunks(text)
else:
await self._type_text_continuously(text)
# 输入完成后可能停顿
if random.random() < self.config['finish_pause_probability']:
pause_duration = random.uniform(*self.config['finish_pause_duration'])
await self._sleep(pause_duration)
return True
except Exception as e:
print(f"人类化输入失败: {e}")
return False
async def _type_text_continuously(self, text: str):
"""连续输入文本,包含人类化效果"""
current_input = ""
for i, char in enumerate(text):
# 随机停顿思考
if self._should_pause():
pause_duration = self._get_random_pause_duration()
await self._sleep(pause_duration)
# 随机错误修正
if current_input and self._should_correct():
await self._simulate_correction(char)
# 随机退格重新输入
if current_input and self._should_backspace():
await self._simulate_backspace_retype(char)
# 计算延迟并输入字符
delay = self._calculate_typing_delay(i)
await self._type_char(char, delay)
current_input += char
# 字符间微小停顿
micro_pause = random.uniform(0.01, 0.08)
await self._sleep(micro_pause)
async def _type_text_in_chunks(self, text: str):
"""分段输入长文本"""
chunk_size = self.config['max_chunk_length']
chunks = [text[i:i+chunk_size] for i in range(0, len(text), chunk_size)]
for i, chunk in enumerate(chunks):
await self._type_text_continuously(chunk)
# 段落间停顿(除了最后一段)
if i < len(chunks) - 1:
pause_duration = random.uniform(*self.config['chunk_pause_duration'])
await self._sleep(pause_duration)
async def _simulate_correction(self, next_char: str):
"""模拟输入错误并修正"""
# 删除最后一个字符
await self._press_key("Backspace")
await self._sleep(random.uniform(0.1, 0.4))
# 输入错误字符
wrong_char = self._get_wrong_char(next_char)
delay = self._calculate_typing_delay()
await self._type_char(wrong_char, delay)
await self._sleep(random.uniform(0.2, 0.5))
# 删除错误字符
await self._press_key("Backspace")
await self._sleep(random.uniform(0.1, 0.3))
async def _simulate_backspace_retype(self, target_char: str):
"""模拟退格后重新输入"""
backspace_count = random.randint(1, min(3, len(target_char)))
# 执行退格
for _ in range(backspace_count):
await self._press_key("Backspace")
await self._sleep(random.uniform(0.1, 0.2))
# 短暂停顿
await self._sleep(random.uniform(0.2, 0.6))
async def click_and_type(self, selector: str, text: str, **kwargs) -> bool:
"""
点击元素并输入文本的便捷方法
Args:
selector: 元素选择器
text: 要输入的文本
**kwargs: 传递给type_text_human的额外参数
Returns:
bool: 是否成功
"""
return await self.type_text_human(selector, text, **kwargs)
async def human_click(self, selector: str, wait_after: bool = True) -> bool:
"""
人类化点击元素
Args:
selector: 元素选择器
wait_after: 点击后是否等待
Returns:
bool: 是否成功点击
"""
try:
await self._wait_for_selector(selector)
# 点击前停顿
delay_before = random.uniform(*self.config['click_delay_before'])
await self._sleep(delay_before)
# 点击
await self._click_element(selector)
# 点击后停顿
if wait_after:
delay_after = random.uniform(*self.config['click_delay_after'])
await self._sleep(delay_after)
return True
except Exception as e:
print(f"人类化点击失败: {e}")
return False
async def human_scroll(self, direction: str = "down", amount: int = 3):
"""
人类化滚动页面
Args:
direction: 滚动方向 "up""down"
amount: 滚动次数
"""
key = "PageDown" if direction == "down" else "PageUp"
for i in range(amount):
await self._press_key(key)
# 滚动间隔
pause = random.uniform(0.3, 0.8)
await self._sleep(pause)
def update_config(self, new_config: Dict[str, Any]):
"""更新配置"""
self.config.update(new_config)
def get_config(self) -> Dict[str, Any]:
"""获取当前配置"""
return self.config.copy()
# 便捷的工厂函数
def create_human_typer(page: Union[Page, AsyncPage], config: Optional[Dict[str, Any]] = None) -> HumanTypingWrapper:
"""
创建人类化输入包装器的工厂函数
Args:
page: Playwright页面对象同步或异步
config: 可选的配置字典
Returns:
HumanTypingWrapper实例
"""
return HumanTypingWrapper(page, config)