autoUpload/utils/enhanced_human_typing.py

401 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import random
import re
import asyncio
import jieba
import jieba.posseg as pseg
from typing import Dict, List, Optional
# 初始化结巴分词的词典
jieba.initialize()
class EnhancedHumanTypingSimulator:
def __init__(self, page=None):
self.page = page
# 优化配置管理
self.config = {
'char_delay': (80, 150), # 减少基础字符延迟
'punct_delay': (150, 250), # 减少标点符号延迟
'paragraph_pause': (0.5, 1.0), # 减少段落停顿
'natural_pause': 0.08, # 降低自然停顿概率
'thought_pause': (0.2, 0.4), # 减少思考停顿时间
'word_pause': (0.1, 0.25), # 减少词语间停顿
'chunk_pause': (0.2, 0.4), # 减少语义块停顿
'char_count_pause': (25, 35), # 增加字符计数范围
'char_count_delay': (0.1, 0.3), # 减少字符计数停顿
'fatigue_threshold': 300, # 增加疲劳阈值
'error_rate_base': 0.01, # 降低基础错误率
'error_rate_max': 0.05, # 降低最大错误率
'distraction_probability': 0.02 # 降低分心概率
}
# 状态管理
self.state = {
'fatigue': 0.0, # 疲劳度 (0-1)
'attention': 1.0, # 注意力 (0-1)
'chars_typed': 0, # 已输入字符数
'last_break_time': 0, # 上次休息时间
'continuous_typing': 0 # 连续输入时间
}
async def type_text(self, text: str, selector: str = None) -> bool:
"""优化的文本输入方法"""
try:
if selector:
await self._prepare_input(selector)
# 简单分段
paragraphs = text.split('\n\n')
for i, paragraph in enumerate(paragraphs):
if not paragraph.strip():
continue
# 段落输入
await self._type_paragraph(paragraph)
# 段落间添加换行和思考时间
if i < len(paragraphs) - 1:
# 段落结束,停顿思考
await asyncio.sleep(random.uniform(0.5, 1.0))
# 输入两个换行
await self.page.keyboard.press("Enter")
await asyncio.sleep(random.uniform(0.1, 0.2))
await self.page.keyboard.press("Enter")
# 准备输入下一段
await asyncio.sleep(random.uniform(0.8, 1.5))
return True
except Exception as e:
print(f"输入文本时出错: {e}")
return False
def _split_text_into_chunks(self, text: str) -> list:
"""使用结巴分词进行智能分词"""
chunks = []
# 使用结巴词性标注
words = pseg.cut(text)
current_chunk = ""
for word, flag in words:
# 处理标点符号
if flag == 'x':
if current_chunk:
chunks.append(current_chunk)
chunks.append(word)
current_chunk = ""
continue
# 处理空格
if word.isspace():
if current_chunk:
chunks.append(current_chunk)
chunks.append(word)
current_chunk = ""
continue
# 处理表情符号和特殊字符
if re.match(r'[^\u4e00-\u9fff\w\s]', word):
if current_chunk:
chunks.append(current_chunk)
chunks.append(word)
current_chunk = ""
continue
# 根据词性决定是否需要独立成块
if flag in ['n', 'v', 'a']: # 名词、动词、形容词
# 如果当前词较长,可能需要再次切分
if len(word) > 3:
sub_chunks = self._split_long_word(word)
if current_chunk:
chunks.append(current_chunk)
chunks.extend(sub_chunks)
current_chunk = ""
else:
if current_chunk:
chunks.append(current_chunk)
chunks.append(word)
current_chunk = ""
else:
# 对于其他词性,累积到当前块
current_chunk += word
# 如果累积的块太长,进行切分
if len(current_chunk) >= 3:
chunks.append(current_chunk)
current_chunk = ""
# 添加最后剩余的块
if current_chunk:
chunks.append(current_chunk)
return chunks
def _split_long_word(self, word: str) -> List[str]:
"""处理长词的切分"""
result = []
temp = ""
for char in word:
temp += char
if len(temp) == 2: # 按双字切分
result.append(temp)
temp = ""
if temp: # 处理剩余字符
result.append(temp)
return result
def _update_state(self, chars_typed: int = 1):
"""更新状态"""
current_time = time.time()
# 更新连续输入时间
if current_time - self.state['last_break_time'] > 5: # 如果超过5秒没有长停顿
self.state['continuous_typing'] += chars_typed
# 更新疲劳度
fatigue_increase = chars_typed / self.config['fatigue_threshold']
self.state['fatigue'] = min(1.0, self.state['fatigue'] + fatigue_increase)
# 更新注意力
if self.state['continuous_typing'] > 100: # 连续输入超过100个字符
self.state['attention'] *= 0.95 # 注意力下降
# 记录字符数
self.state['chars_typed'] += chars_typed
# 检查是否需要休息
if self.state['fatigue'] > 0.7 or self.state['attention'] < 0.5:
return True
return False
def _take_break(self):
"""模拟休息"""
self.state['fatigue'] *= 0.5 # 疲劳度减半
self.state['attention'] = min(1.0, self.state['attention'] * 1.5) # 注意力恢复
self.state['continuous_typing'] = 0 # 重置连续输入
self.state['last_break_time'] = time.time() # 更新休息时间
def _get_current_error_rate(self) -> float:
"""获取当前错误率"""
base_rate = self.config['error_rate_base']
fatigue_factor = self.state['fatigue'] * (self.config['error_rate_max'] - base_rate)
attention_factor = (1 - self.state['attention']) * 0.05
return min(self.config['error_rate_max'], base_rate + fatigue_factor + attention_factor)
async def _simulate_error(self, char: str):
"""模拟输入错误"""
# 随机选择一个错误字符
wrong_chars = '的地得了着过去来到和与及' if '\u4e00' <= char <= '\u9fff' else 'asdfjkl;'
wrong_char = random.choice(wrong_chars)
# 输入错误字符
await self.page.keyboard.type(wrong_char)
await asyncio.sleep(random.uniform(0.2, 0.5)) # 察觉错误的时间
# 删除错误字符
await self.page.keyboard.press('Backspace')
await asyncio.sleep(random.uniform(0.1, 0.3)) # 删除后的停顿
# 输入正确字符
await self.page.keyboard.type(char)
async def _simulate_distraction(self):
"""模拟轻微分心"""
distraction_time = random.uniform(0.8, 1.5) # 减少分心时间
await asyncio.sleep(distraction_time)
self._take_break() # 分心也算是一种休息
async def _type_paragraph(self, paragraph: str):
"""优化的段落输入方法"""
# 将段落分割成词语块
chunks = self._split_text_into_chunks(paragraph)
# 计算语义块通常是3-4个词语一组
semantic_chunks = []
current_semantic = []
word_count = 0
for chunk in chunks:
current_semantic.append(chunk)
if chunk in ',。!?、;:': # 遇到标点就是一个语义块的结束
semantic_chunks.append(current_semantic)
current_semantic = []
word_count = 0
else:
word_count += 1
if word_count >= random.randint(2, 3): # 2-3个词语组成一个语义块
semantic_chunks.append(current_semantic)
current_semantic = []
word_count = 0
if current_semantic:
semantic_chunks.append(current_semantic)
# 输入每个语义块
for semantic_block in semantic_chunks:
# 语义块之前可能停顿思考
if random.random() < self.config['natural_pause']:
await asyncio.sleep(random.uniform(*self.config['thought_pause']))
# 输入语义块中的每个词语
for chunk in semantic_block:
# 检查疲劳状态
if self._update_state(len(chunk)):
# 需要短暂休息
await asyncio.sleep(random.uniform(0.5, 1.0)) # 减少休息时间
self._take_break()
# 检查是否轻微分心
if random.random() < self.config['distraction_probability'] and len(chunk) > 2:
await self._simulate_distraction()
# 词语输入
for char in chunk:
# 检查是否出错(只在疲劳时更容易出错)
if self.state['fatigue'] > 0.6: # 只有疲劳时才容易出错
current_error_rate = self._get_current_error_rate()
if random.random() < current_error_rate:
await self._simulate_error(char)
continue
# 正常字符输入
if char in ',。!?、;:':
# 标点符号输入
delay = random.randint(*self.config['punct_delay'])
# 疲劳会增加延迟
delay = int(delay * (1 + self.state['fatigue'] * 0.5))
await self.page.keyboard.type(char, delay=delay)
# 标点符号后一定停顿
await asyncio.sleep(random.uniform(*self.config['word_pause']))
else:
# 普通字符输入
delay = random.randint(*self.config['char_delay'])
# 疲劳和注意力影响输入速度
delay = int(delay * (1 + self.state['fatigue'] * 0.5 - self.state['attention'] * 0.2))
await self.page.keyboard.type(char, delay=delay)
# 更新状态
self._update_state()
# 词语间停顿
pause_time = random.uniform(*self.config['word_pause'])
# 疲劳会增加停顿时间
pause_time *= (1 + self.state['fatigue'] * 0.3)
await asyncio.sleep(pause_time)
# 语义块之间的停顿
pause_time = random.uniform(*self.config['chunk_pause'])
# 疲劳和注意力影响停顿时间
pause_time *= (1 + self.state['fatigue'] * 0.5 - self.state['attention'] * 0.2)
await asyncio.sleep(pause_time)
async def _prepare_input(self, selector: str):
"""准备输入"""
try:
element = await self.page.wait_for_selector(selector, timeout=5000)
await element.click()
await asyncio.sleep(random.uniform(0.3, 0.8))
except Exception as e:
print(f"准备输入失败: {e}")
raise
class OptimizedXHSTyping:
"""优化的小红书输入模拟器"""
def __init__(self, page):
self.page = page
self.typing_config = {
'char_delay': (100, 200), # 基础字符延迟
'punct_delay': (200, 300), # 标点符号延迟
'paragraph_pause': (0.5, 1.0), # 段落停顿
'natural_pause': 0.05 # 自然停顿概率
}
async def type_text(self, text: str):
paragraphs = text.split('\n\n')
for i, para in enumerate(paragraphs):
# 段落输入
await self._type_paragraph(para)
# 段落间自然停顿
if i < len(paragraphs) - 1:
await asyncio.sleep(random.uniform(*self.typing_config['paragraph_pause']))
async def _type_paragraph(self, paragraph: str):
char_count = 0
for char in paragraph:
# 随机自然停顿
if random.random() < self.typing_config['natural_pause']:
await asyncio.sleep(random.uniform(0.2, 0.5))
# 字符输入
if char in ',。!?、;:':
delay = random.randint(*self.typing_config['punct_delay'])
else:
delay = random.randint(*self.typing_config['char_delay'])
await self.page.keyboard.type(char, delay=delay)
char_count += 1
# 每20-30个字符后可能停顿
if char_count % random.randint(20, 30) == 0:
await asyncio.sleep(random.uniform(0.1, 0.3))
class XHSEnhancedTyping(EnhancedHumanTypingSimulator):
"""小红书专用增强版输入模拟器"""
def __init__(self, page=None):
super().__init__(page)
self.tag_mode = False
async def type_text(self, text: str, selector: str = None) -> bool:
"""重写文本输入方法"""
if self.tag_mode:
# 标签模式下使用较慢的输入速度
self.base_config.update({
'min_typing_speed': 5,
'max_typing_speed': 12
})
else:
# 正常文本模式
self.base_config.update({
'min_typing_speed': 8,
'max_typing_speed': 20
})
return await super().type_text(text, selector)
async def handle_tag_input(self, tag: str):
"""标签输入处理"""
self.tag_mode = True
# 输入#号
await self.page.keyboard.press("Shift")
await asyncio.sleep(random.uniform(0.1, 0.2))
await self.page.keyboard.press("3")
await self.page.keyboard.up("Shift")
# 输入标签文本
await self.type_text(tag)
# 等待建议出现
await asyncio.sleep(random.uniform(0.3, 0.5))
# 70%概率选择建议
if random.random() < 0.7:
try:
suggestions = await self.page.query_selector_all('.suggestion-item')
if suggestions:
await random.choice(suggestions[:2]).click()
await asyncio.sleep(random.uniform(0.2, 0.4))
self.tag_mode = False
return
except:
pass
# 如果没有选择建议,直接回车
await self.page.keyboard.press("Enter")
await asyncio.sleep(random.uniform(0.2, 0.4))
self.tag_mode = False