autoUpload/utils/enhanced_human_typing.py

401 lines
16 KiB
Python
Raw Normal View History

import time
import random
import re
import asyncio
2025-10-20 09:50:22 +08:00
import jieba
import jieba.posseg as pseg
from typing import Dict, List, Optional
2025-10-20 09:50:22 +08:00
# 初始化结巴分词的词典
jieba.initialize()
class EnhancedHumanTypingSimulator:
def __init__(self, page=None):
2025-10-20 09:50:22 +08:00
self.page = page
# 优化配置管理
self.config = {
'char_delay': (80, 150), # 减少基础字符延迟
'punct_delay': (150, 250), # 减少标点符号延迟
'paragraph_pause': (0.5, 1.0), # 减少段落停顿
'natural_pause': 0.08, # 降低自然停顿概率
'thought_pause': (0.2, 0.4), # 减少思考停顿时间
'word_pause': (0.1, 0.25), # 减少词语间停顿
'chunk_pause': (0.2, 0.4), # 减少语义块停顿
'char_count_pause': (25, 35), # 增加字符计数范围
'char_count_delay': (0.1, 0.3), # 减少字符计数停顿
'fatigue_threshold': 300, # 增加疲劳阈值
'error_rate_base': 0.01, # 降低基础错误率
'error_rate_max': 0.05, # 降低最大错误率
'distraction_probability': 0.02 # 降低分心概率
}
2025-10-20 09:50:22 +08:00
# 状态管理
self.state = {
'fatigue': 0.0, # 疲劳度 (0-1)
'attention': 1.0, # 注意力 (0-1)
'chars_typed': 0, # 已输入字符数
'last_break_time': 0, # 上次休息时间
'continuous_typing': 0 # 连续输入时间
}
async def type_text(self, text: str, selector: str = None) -> bool:
2025-10-20 09:50:22 +08:00
"""优化的文本输入方法"""
try:
if selector:
await self._prepare_input(selector)
2025-10-20 09:50:22 +08:00
# 简单分段
paragraphs = text.split('\n\n')
2025-10-20 09:50:22 +08:00
for i, paragraph in enumerate(paragraphs):
if not paragraph.strip():
continue
# 段落输入
await self._type_paragraph(paragraph)
2025-10-20 09:50:22 +08:00
# 段落间添加换行和思考时间
if i < len(paragraphs) - 1:
# 段落结束,停顿思考
await asyncio.sleep(random.uniform(0.5, 1.0))
# 输入两个换行
await self.page.keyboard.press("Enter")
await asyncio.sleep(random.uniform(0.1, 0.2))
await self.page.keyboard.press("Enter")
# 准备输入下一段
await asyncio.sleep(random.uniform(0.8, 1.5))
return True
except Exception as e:
print(f"输入文本时出错: {e}")
return False
2025-10-20 09:50:22 +08:00
def _split_text_into_chunks(self, text: str) -> list:
"""使用结巴分词进行智能分词"""
chunks = []
2025-10-20 09:50:22 +08:00
# 使用结巴词性标注
words = pseg.cut(text)
current_chunk = ""
for word, flag in words:
# 处理标点符号
if flag == 'x':
if current_chunk:
chunks.append(current_chunk)
chunks.append(word)
current_chunk = ""
continue
# 处理空格
if word.isspace():
if current_chunk:
chunks.append(current_chunk)
chunks.append(word)
current_chunk = ""
continue
2025-10-20 09:50:22 +08:00
# 处理表情符号和特殊字符
if re.match(r'[^\u4e00-\u9fff\w\s]', word):
if current_chunk:
chunks.append(current_chunk)
chunks.append(word)
current_chunk = ""
continue
2025-10-20 09:50:22 +08:00
# 根据词性决定是否需要独立成块
if flag in ['n', 'v', 'a']: # 名词、动词、形容词
# 如果当前词较长,可能需要再次切分
if len(word) > 3:
sub_chunks = self._split_long_word(word)
if current_chunk:
chunks.append(current_chunk)
chunks.extend(sub_chunks)
current_chunk = ""
else:
2025-10-20 09:50:22 +08:00
if current_chunk:
chunks.append(current_chunk)
2025-10-20 09:50:22 +08:00
chunks.append(word)
current_chunk = ""
else:
# 对于其他词性,累积到当前块
current_chunk += word
# 如果累积的块太长,进行切分
if len(current_chunk) >= 3:
chunks.append(current_chunk)
current_chunk = ""
# 添加最后剩余的块
if current_chunk:
chunks.append(current_chunk)
return chunks
2025-10-20 09:50:22 +08:00
def _split_long_word(self, word: str) -> List[str]:
"""处理长词的切分"""
result = []
temp = ""
for char in word:
temp += char
if len(temp) == 2: # 按双字切分
result.append(temp)
temp = ""
if temp: # 处理剩余字符
result.append(temp)
return result
2025-10-20 09:50:22 +08:00
def _update_state(self, chars_typed: int = 1):
"""更新状态"""
current_time = time.time()
# 更新连续输入时间
if current_time - self.state['last_break_time'] > 5: # 如果超过5秒没有长停顿
self.state['continuous_typing'] += chars_typed
# 更新疲劳度
fatigue_increase = chars_typed / self.config['fatigue_threshold']
self.state['fatigue'] = min(1.0, self.state['fatigue'] + fatigue_increase)
# 更新注意力
if self.state['continuous_typing'] > 100: # 连续输入超过100个字符
self.state['attention'] *= 0.95 # 注意力下降
# 记录字符数
self.state['chars_typed'] += chars_typed
# 检查是否需要休息
if self.state['fatigue'] > 0.7 or self.state['attention'] < 0.5:
return True
return False
def _take_break(self):
"""模拟休息"""
self.state['fatigue'] *= 0.5 # 疲劳度减半
self.state['attention'] = min(1.0, self.state['attention'] * 1.5) # 注意力恢复
self.state['continuous_typing'] = 0 # 重置连续输入
self.state['last_break_time'] = time.time() # 更新休息时间
def _get_current_error_rate(self) -> float:
"""获取当前错误率"""
base_rate = self.config['error_rate_base']
fatigue_factor = self.state['fatigue'] * (self.config['error_rate_max'] - base_rate)
attention_factor = (1 - self.state['attention']) * 0.05
return min(self.config['error_rate_max'], base_rate + fatigue_factor + attention_factor)
async def _simulate_error(self, char: str):
"""模拟输入错误"""
# 随机选择一个错误字符
wrong_chars = '的地得了着过去来到和与及' if '\u4e00' <= char <= '\u9fff' else 'asdfjkl;'
wrong_char = random.choice(wrong_chars)
# 输入错误字符
await self.page.keyboard.type(wrong_char)
await asyncio.sleep(random.uniform(0.2, 0.5)) # 察觉错误的时间
# 删除错误字符
await self.page.keyboard.press('Backspace')
await asyncio.sleep(random.uniform(0.1, 0.3)) # 删除后的停顿
# 输入正确字符
await self.page.keyboard.type(char)
async def _simulate_distraction(self):
"""模拟轻微分心"""
distraction_time = random.uniform(0.8, 1.5) # 减少分心时间
await asyncio.sleep(distraction_time)
self._take_break() # 分心也算是一种休息
2025-10-20 09:50:22 +08:00
async def _type_paragraph(self, paragraph: str):
"""优化的段落输入方法"""
# 将段落分割成词语块
chunks = self._split_text_into_chunks(paragraph)
# 计算语义块通常是3-4个词语一组
semantic_chunks = []
current_semantic = []
word_count = 0
for chunk in chunks:
current_semantic.append(chunk)
if chunk in ',。!?、;:': # 遇到标点就是一个语义块的结束
semantic_chunks.append(current_semantic)
current_semantic = []
word_count = 0
else:
word_count += 1
if word_count >= random.randint(2, 3): # 2-3个词语组成一个语义块
semantic_chunks.append(current_semantic)
current_semantic = []
word_count = 0
if current_semantic:
semantic_chunks.append(current_semantic)
# 输入每个语义块
for semantic_block in semantic_chunks:
# 语义块之前可能停顿思考
if random.random() < self.config['natural_pause']:
await asyncio.sleep(random.uniform(*self.config['thought_pause']))
2025-10-20 09:50:22 +08:00
# 输入语义块中的每个词语
for chunk in semantic_block:
# 检查疲劳状态
if self._update_state(len(chunk)):
# 需要短暂休息
await asyncio.sleep(random.uniform(0.5, 1.0)) # 减少休息时间
self._take_break()
# 检查是否轻微分心
if random.random() < self.config['distraction_probability'] and len(chunk) > 2:
await self._simulate_distraction()
# 词语输入
for char in chunk:
# 检查是否出错(只在疲劳时更容易出错)
if self.state['fatigue'] > 0.6: # 只有疲劳时才容易出错
current_error_rate = self._get_current_error_rate()
if random.random() < current_error_rate:
await self._simulate_error(char)
continue
# 正常字符输入
if char in ',。!?、;:':
# 标点符号输入
delay = random.randint(*self.config['punct_delay'])
# 疲劳会增加延迟
delay = int(delay * (1 + self.state['fatigue'] * 0.5))
await self.page.keyboard.type(char, delay=delay)
# 标点符号后一定停顿
await asyncio.sleep(random.uniform(*self.config['word_pause']))
else:
# 普通字符输入
delay = random.randint(*self.config['char_delay'])
# 疲劳和注意力影响输入速度
delay = int(delay * (1 + self.state['fatigue'] * 0.5 - self.state['attention'] * 0.2))
await self.page.keyboard.type(char, delay=delay)
# 更新状态
self._update_state()
# 词语间停顿
pause_time = random.uniform(*self.config['word_pause'])
# 疲劳会增加停顿时间
pause_time *= (1 + self.state['fatigue'] * 0.3)
await asyncio.sleep(pause_time)
2025-10-20 09:50:22 +08:00
# 语义块之间的停顿
pause_time = random.uniform(*self.config['chunk_pause'])
# 疲劳和注意力影响停顿时间
pause_time *= (1 + self.state['fatigue'] * 0.5 - self.state['attention'] * 0.2)
await asyncio.sleep(pause_time)
async def _prepare_input(self, selector: str):
"""准备输入"""
try:
2025-10-20 09:50:22 +08:00
element = await self.page.wait_for_selector(selector, timeout=5000)
await element.click()
await asyncio.sleep(random.uniform(0.3, 0.8))
except Exception as e:
print(f"准备输入失败: {e}")
raise
2025-10-20 09:50:22 +08:00
class OptimizedXHSTyping:
"""优化的小红书输入模拟器"""
def __init__(self, page):
self.page = page
self.typing_config = {
'char_delay': (100, 200), # 基础字符延迟
'punct_delay': (200, 300), # 标点符号延迟
'paragraph_pause': (0.5, 1.0), # 段落停顿
'natural_pause': 0.05 # 自然停顿概率
}
async def type_text(self, text: str):
paragraphs = text.split('\n\n')
for i, para in enumerate(paragraphs):
# 段落输入
await self._type_paragraph(para)
# 段落间自然停顿
if i < len(paragraphs) - 1:
await asyncio.sleep(random.uniform(*self.typing_config['paragraph_pause']))
async def _type_paragraph(self, paragraph: str):
char_count = 0
for char in paragraph:
# 随机自然停顿
if random.random() < self.typing_config['natural_pause']:
await asyncio.sleep(random.uniform(0.2, 0.5))
# 字符输入
if char in ',。!?、;:':
delay = random.randint(*self.typing_config['punct_delay'])
else:
delay = random.randint(*self.typing_config['char_delay'])
await self.page.keyboard.type(char, delay=delay)
2025-10-20 09:50:22 +08:00
char_count += 1
# 每20-30个字符后可能停顿
if char_count % random.randint(20, 30) == 0:
await asyncio.sleep(random.uniform(0.1, 0.3))
2025-10-20 09:50:22 +08:00
class XHSEnhancedTyping(EnhancedHumanTypingSimulator):
"""小红书专用增强版输入模拟器"""
def __init__(self, page=None):
super().__init__(page)
self.tag_mode = False
async def type_text(self, text: str, selector: str = None) -> bool:
"""重写文本输入方法"""
if self.tag_mode:
# 标签模式下使用较慢的输入速度
self.base_config.update({
'min_typing_speed': 5,
'max_typing_speed': 12
})
else:
# 正常文本模式
self.base_config.update({
'min_typing_speed': 8,
'max_typing_speed': 20
})
return await super().type_text(text, selector)
async def handle_tag_input(self, tag: str):
"""标签输入处理"""
self.tag_mode = True
# 输入#号
await self.page.keyboard.press("Shift")
await asyncio.sleep(random.uniform(0.1, 0.2))
await self.page.keyboard.press("3")
await self.page.keyboard.up("Shift")
# 输入标签文本
await self.type_text(tag)
# 等待建议出现
await asyncio.sleep(random.uniform(0.3, 0.5))
# 70%概率选择建议
if random.random() < 0.7:
try:
suggestions = await self.page.query_selector_all('.suggestion-item')
if suggestions:
await random.choice(suggestions[:2]).click()
await asyncio.sleep(random.uniform(0.2, 0.4))
self.tag_mode = False
return
except:
pass
# 如果没有选择建议,直接回车
await self.page.keyboard.press("Enter")
await asyncio.sleep(random.uniform(0.2, 0.4))
self.tag_mode = False