401 lines
16 KiB
Python
401 lines
16 KiB
Python
import time
|
||
import random
|
||
import re
|
||
import asyncio
|
||
import jieba
|
||
import jieba.posseg as pseg
|
||
from typing import Dict, List, Optional
|
||
|
||
# 初始化结巴分词的词典
|
||
jieba.initialize()
|
||
|
||
class EnhancedHumanTypingSimulator:
|
||
def __init__(self, page=None):
|
||
self.page = page
|
||
# 优化配置管理
|
||
self.config = {
|
||
'char_delay': (80, 150), # 减少基础字符延迟
|
||
'punct_delay': (150, 250), # 减少标点符号延迟
|
||
'paragraph_pause': (0.5, 1.0), # 减少段落停顿
|
||
'natural_pause': 0.08, # 降低自然停顿概率
|
||
'thought_pause': (0.2, 0.4), # 减少思考停顿时间
|
||
'word_pause': (0.1, 0.25), # 减少词语间停顿
|
||
'chunk_pause': (0.2, 0.4), # 减少语义块停顿
|
||
'char_count_pause': (25, 35), # 增加字符计数范围
|
||
'char_count_delay': (0.1, 0.3), # 减少字符计数停顿
|
||
'fatigue_threshold': 300, # 增加疲劳阈值
|
||
'error_rate_base': 0.01, # 降低基础错误率
|
||
'error_rate_max': 0.05, # 降低最大错误率
|
||
'distraction_probability': 0.02 # 降低分心概率
|
||
}
|
||
|
||
# 状态管理
|
||
self.state = {
|
||
'fatigue': 0.0, # 疲劳度 (0-1)
|
||
'attention': 1.0, # 注意力 (0-1)
|
||
'chars_typed': 0, # 已输入字符数
|
||
'last_break_time': 0, # 上次休息时间
|
||
'continuous_typing': 0 # 连续输入时间
|
||
}
|
||
|
||
async def type_text(self, text: str, selector: str = None) -> bool:
|
||
"""优化的文本输入方法"""
|
||
try:
|
||
if selector:
|
||
await self._prepare_input(selector)
|
||
|
||
# 简单分段
|
||
paragraphs = text.split('\n\n')
|
||
|
||
for i, paragraph in enumerate(paragraphs):
|
||
if not paragraph.strip():
|
||
continue
|
||
|
||
# 段落输入
|
||
await self._type_paragraph(paragraph)
|
||
|
||
# 段落间添加换行和思考时间
|
||
if i < len(paragraphs) - 1:
|
||
# 段落结束,停顿思考
|
||
await asyncio.sleep(random.uniform(0.5, 1.0))
|
||
# 输入两个换行
|
||
await self.page.keyboard.press("Enter")
|
||
await asyncio.sleep(random.uniform(0.1, 0.2))
|
||
await self.page.keyboard.press("Enter")
|
||
# 准备输入下一段
|
||
await asyncio.sleep(random.uniform(0.8, 1.5))
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"输入文本时出错: {e}")
|
||
return False
|
||
|
||
def _split_text_into_chunks(self, text: str) -> list:
|
||
"""使用结巴分词进行智能分词"""
|
||
chunks = []
|
||
|
||
# 使用结巴词性标注
|
||
words = pseg.cut(text)
|
||
|
||
current_chunk = ""
|
||
for word, flag in words:
|
||
# 处理标点符号
|
||
if flag == 'x':
|
||
if current_chunk:
|
||
chunks.append(current_chunk)
|
||
chunks.append(word)
|
||
current_chunk = ""
|
||
continue
|
||
|
||
# 处理空格
|
||
if word.isspace():
|
||
if current_chunk:
|
||
chunks.append(current_chunk)
|
||
chunks.append(word)
|
||
current_chunk = ""
|
||
continue
|
||
|
||
# 处理表情符号和特殊字符
|
||
if re.match(r'[^\u4e00-\u9fff\w\s]', word):
|
||
if current_chunk:
|
||
chunks.append(current_chunk)
|
||
chunks.append(word)
|
||
current_chunk = ""
|
||
continue
|
||
|
||
# 根据词性决定是否需要独立成块
|
||
if flag in ['n', 'v', 'a']: # 名词、动词、形容词
|
||
# 如果当前词较长,可能需要再次切分
|
||
if len(word) > 3:
|
||
sub_chunks = self._split_long_word(word)
|
||
if current_chunk:
|
||
chunks.append(current_chunk)
|
||
chunks.extend(sub_chunks)
|
||
current_chunk = ""
|
||
else:
|
||
if current_chunk:
|
||
chunks.append(current_chunk)
|
||
chunks.append(word)
|
||
current_chunk = ""
|
||
else:
|
||
# 对于其他词性,累积到当前块
|
||
current_chunk += word
|
||
# 如果累积的块太长,进行切分
|
||
if len(current_chunk) >= 3:
|
||
chunks.append(current_chunk)
|
||
current_chunk = ""
|
||
|
||
# 添加最后剩余的块
|
||
if current_chunk:
|
||
chunks.append(current_chunk)
|
||
|
||
return chunks
|
||
|
||
def _split_long_word(self, word: str) -> List[str]:
|
||
"""处理长词的切分"""
|
||
result = []
|
||
temp = ""
|
||
for char in word:
|
||
temp += char
|
||
if len(temp) == 2: # 按双字切分
|
||
result.append(temp)
|
||
temp = ""
|
||
if temp: # 处理剩余字符
|
||
result.append(temp)
|
||
return result
|
||
|
||
def _update_state(self, chars_typed: int = 1):
|
||
"""更新状态"""
|
||
current_time = time.time()
|
||
|
||
# 更新连续输入时间
|
||
if current_time - self.state['last_break_time'] > 5: # 如果超过5秒没有长停顿
|
||
self.state['continuous_typing'] += chars_typed
|
||
|
||
# 更新疲劳度
|
||
fatigue_increase = chars_typed / self.config['fatigue_threshold']
|
||
self.state['fatigue'] = min(1.0, self.state['fatigue'] + fatigue_increase)
|
||
|
||
# 更新注意力
|
||
if self.state['continuous_typing'] > 100: # 连续输入超过100个字符
|
||
self.state['attention'] *= 0.95 # 注意力下降
|
||
|
||
# 记录字符数
|
||
self.state['chars_typed'] += chars_typed
|
||
|
||
# 检查是否需要休息
|
||
if self.state['fatigue'] > 0.7 or self.state['attention'] < 0.5:
|
||
return True
|
||
return False
|
||
|
||
def _take_break(self):
|
||
"""模拟休息"""
|
||
self.state['fatigue'] *= 0.5 # 疲劳度减半
|
||
self.state['attention'] = min(1.0, self.state['attention'] * 1.5) # 注意力恢复
|
||
self.state['continuous_typing'] = 0 # 重置连续输入
|
||
self.state['last_break_time'] = time.time() # 更新休息时间
|
||
|
||
def _get_current_error_rate(self) -> float:
|
||
"""获取当前错误率"""
|
||
base_rate = self.config['error_rate_base']
|
||
fatigue_factor = self.state['fatigue'] * (self.config['error_rate_max'] - base_rate)
|
||
attention_factor = (1 - self.state['attention']) * 0.05
|
||
return min(self.config['error_rate_max'], base_rate + fatigue_factor + attention_factor)
|
||
|
||
async def _simulate_error(self, char: str):
|
||
"""模拟输入错误"""
|
||
# 随机选择一个错误字符
|
||
wrong_chars = '的地得了着过去来到和与及' if '\u4e00' <= char <= '\u9fff' else 'asdfjkl;'
|
||
wrong_char = random.choice(wrong_chars)
|
||
|
||
# 输入错误字符
|
||
await self.page.keyboard.type(wrong_char)
|
||
await asyncio.sleep(random.uniform(0.2, 0.5)) # 察觉错误的时间
|
||
|
||
# 删除错误字符
|
||
await self.page.keyboard.press('Backspace')
|
||
await asyncio.sleep(random.uniform(0.1, 0.3)) # 删除后的停顿
|
||
|
||
# 输入正确字符
|
||
await self.page.keyboard.type(char)
|
||
|
||
async def _simulate_distraction(self):
|
||
"""模拟轻微分心"""
|
||
distraction_time = random.uniform(0.8, 1.5) # 减少分心时间
|
||
await asyncio.sleep(distraction_time)
|
||
self._take_break() # 分心也算是一种休息
|
||
|
||
async def _type_paragraph(self, paragraph: str):
|
||
"""优化的段落输入方法"""
|
||
# 将段落分割成词语块
|
||
chunks = self._split_text_into_chunks(paragraph)
|
||
|
||
# 计算语义块(通常是3-4个词语一组)
|
||
semantic_chunks = []
|
||
current_semantic = []
|
||
word_count = 0
|
||
|
||
for chunk in chunks:
|
||
current_semantic.append(chunk)
|
||
if chunk in ',。!?、;:': # 遇到标点就是一个语义块的结束
|
||
semantic_chunks.append(current_semantic)
|
||
current_semantic = []
|
||
word_count = 0
|
||
else:
|
||
word_count += 1
|
||
if word_count >= random.randint(2, 3): # 2-3个词语组成一个语义块
|
||
semantic_chunks.append(current_semantic)
|
||
current_semantic = []
|
||
word_count = 0
|
||
|
||
if current_semantic:
|
||
semantic_chunks.append(current_semantic)
|
||
|
||
# 输入每个语义块
|
||
for semantic_block in semantic_chunks:
|
||
# 语义块之前可能停顿思考
|
||
if random.random() < self.config['natural_pause']:
|
||
await asyncio.sleep(random.uniform(*self.config['thought_pause']))
|
||
|
||
# 输入语义块中的每个词语
|
||
for chunk in semantic_block:
|
||
# 检查疲劳状态
|
||
if self._update_state(len(chunk)):
|
||
# 需要短暂休息
|
||
await asyncio.sleep(random.uniform(0.5, 1.0)) # 减少休息时间
|
||
self._take_break()
|
||
|
||
# 检查是否轻微分心
|
||
if random.random() < self.config['distraction_probability'] and len(chunk) > 2:
|
||
await self._simulate_distraction()
|
||
|
||
# 词语输入
|
||
for char in chunk:
|
||
# 检查是否出错(只在疲劳时更容易出错)
|
||
if self.state['fatigue'] > 0.6: # 只有疲劳时才容易出错
|
||
current_error_rate = self._get_current_error_rate()
|
||
if random.random() < current_error_rate:
|
||
await self._simulate_error(char)
|
||
continue
|
||
|
||
# 正常字符输入
|
||
if char in ',。!?、;:':
|
||
# 标点符号输入
|
||
delay = random.randint(*self.config['punct_delay'])
|
||
# 疲劳会增加延迟
|
||
delay = int(delay * (1 + self.state['fatigue'] * 0.5))
|
||
await self.page.keyboard.type(char, delay=delay)
|
||
# 标点符号后一定停顿
|
||
await asyncio.sleep(random.uniform(*self.config['word_pause']))
|
||
else:
|
||
# 普通字符输入
|
||
delay = random.randint(*self.config['char_delay'])
|
||
# 疲劳和注意力影响输入速度
|
||
delay = int(delay * (1 + self.state['fatigue'] * 0.5 - self.state['attention'] * 0.2))
|
||
await self.page.keyboard.type(char, delay=delay)
|
||
|
||
# 更新状态
|
||
self._update_state()
|
||
|
||
# 词语间停顿
|
||
pause_time = random.uniform(*self.config['word_pause'])
|
||
# 疲劳会增加停顿时间
|
||
pause_time *= (1 + self.state['fatigue'] * 0.3)
|
||
await asyncio.sleep(pause_time)
|
||
|
||
# 语义块之间的停顿
|
||
pause_time = random.uniform(*self.config['chunk_pause'])
|
||
# 疲劳和注意力影响停顿时间
|
||
pause_time *= (1 + self.state['fatigue'] * 0.5 - self.state['attention'] * 0.2)
|
||
await asyncio.sleep(pause_time)
|
||
|
||
async def _prepare_input(self, selector: str):
|
||
"""准备输入"""
|
||
try:
|
||
element = await self.page.wait_for_selector(selector, timeout=5000)
|
||
await element.click()
|
||
await asyncio.sleep(random.uniform(0.3, 0.8))
|
||
except Exception as e:
|
||
print(f"准备输入失败: {e}")
|
||
raise
|
||
|
||
class OptimizedXHSTyping:
|
||
"""优化的小红书输入模拟器"""
|
||
def __init__(self, page):
|
||
self.page = page
|
||
self.typing_config = {
|
||
'char_delay': (100, 200), # 基础字符延迟
|
||
'punct_delay': (200, 300), # 标点符号延迟
|
||
'paragraph_pause': (0.5, 1.0), # 段落停顿
|
||
'natural_pause': 0.05 # 自然停顿概率
|
||
}
|
||
|
||
async def type_text(self, text: str):
|
||
paragraphs = text.split('\n\n')
|
||
|
||
for i, para in enumerate(paragraphs):
|
||
# 段落输入
|
||
await self._type_paragraph(para)
|
||
|
||
# 段落间自然停顿
|
||
if i < len(paragraphs) - 1:
|
||
await asyncio.sleep(random.uniform(*self.typing_config['paragraph_pause']))
|
||
|
||
async def _type_paragraph(self, paragraph: str):
|
||
char_count = 0
|
||
|
||
for char in paragraph:
|
||
# 随机自然停顿
|
||
if random.random() < self.typing_config['natural_pause']:
|
||
await asyncio.sleep(random.uniform(0.2, 0.5))
|
||
|
||
# 字符输入
|
||
if char in ',。!?、;:':
|
||
delay = random.randint(*self.typing_config['punct_delay'])
|
||
else:
|
||
delay = random.randint(*self.typing_config['char_delay'])
|
||
|
||
await self.page.keyboard.type(char, delay=delay)
|
||
char_count += 1
|
||
|
||
# 每20-30个字符后可能停顿
|
||
if char_count % random.randint(20, 30) == 0:
|
||
await asyncio.sleep(random.uniform(0.1, 0.3))
|
||
|
||
class XHSEnhancedTyping(EnhancedHumanTypingSimulator):
|
||
"""小红书专用增强版输入模拟器"""
|
||
def __init__(self, page=None):
|
||
super().__init__(page)
|
||
self.tag_mode = False
|
||
|
||
async def type_text(self, text: str, selector: str = None) -> bool:
|
||
"""重写文本输入方法"""
|
||
if self.tag_mode:
|
||
# 标签模式下使用较慢的输入速度
|
||
self.base_config.update({
|
||
'min_typing_speed': 5,
|
||
'max_typing_speed': 12
|
||
})
|
||
else:
|
||
# 正常文本模式
|
||
self.base_config.update({
|
||
'min_typing_speed': 8,
|
||
'max_typing_speed': 20
|
||
})
|
||
|
||
return await super().type_text(text, selector)
|
||
|
||
async def handle_tag_input(self, tag: str):
|
||
"""标签输入处理"""
|
||
self.tag_mode = True
|
||
|
||
# 输入#号
|
||
await self.page.keyboard.press("Shift")
|
||
await asyncio.sleep(random.uniform(0.1, 0.2))
|
||
await self.page.keyboard.press("3")
|
||
await self.page.keyboard.up("Shift")
|
||
|
||
# 输入标签文本
|
||
await self.type_text(tag)
|
||
|
||
# 等待建议出现
|
||
await asyncio.sleep(random.uniform(0.3, 0.5))
|
||
|
||
# 70%概率选择建议
|
||
if random.random() < 0.7:
|
||
try:
|
||
suggestions = await self.page.query_selector_all('.suggestion-item')
|
||
if suggestions:
|
||
await random.choice(suggestions[:2]).click()
|
||
await asyncio.sleep(random.uniform(0.2, 0.4))
|
||
self.tag_mode = False
|
||
return
|
||
except:
|
||
pass
|
||
|
||
# 如果没有选择建议,直接回车
|
||
await self.page.keyboard.press("Enter")
|
||
await asyncio.sleep(random.uniform(0.2, 0.4))
|
||
self.tag_mode = False
|