import time import random import re import asyncio import jieba import jieba.posseg as pseg from typing import Dict, List, Optional # 初始化结巴分词的词典 jieba.initialize() class EnhancedHumanTypingSimulator: def __init__(self, page=None): self.page = page # 优化配置管理 self.config = { 'char_delay': (80, 150), # 减少基础字符延迟 'punct_delay': (150, 250), # 减少标点符号延迟 'paragraph_pause': (0.5, 1.0), # 减少段落停顿 'natural_pause': 0.08, # 降低自然停顿概率 'thought_pause': (0.2, 0.4), # 减少思考停顿时间 'word_pause': (0.1, 0.25), # 减少词语间停顿 'chunk_pause': (0.2, 0.4), # 减少语义块停顿 'char_count_pause': (25, 35), # 增加字符计数范围 'char_count_delay': (0.1, 0.3), # 减少字符计数停顿 'fatigue_threshold': 300, # 增加疲劳阈值 'error_rate_base': 0.01, # 降低基础错误率 'error_rate_max': 0.05, # 降低最大错误率 'distraction_probability': 0.02 # 降低分心概率 } # 状态管理 self.state = { 'fatigue': 0.0, # 疲劳度 (0-1) 'attention': 1.0, # 注意力 (0-1) 'chars_typed': 0, # 已输入字符数 'last_break_time': 0, # 上次休息时间 'continuous_typing': 0 # 连续输入时间 } async def type_text(self, text: str, selector: str = None) -> bool: """优化的文本输入方法""" try: if selector: await self._prepare_input(selector) # 简单分段 paragraphs = text.split('\n\n') for i, paragraph in enumerate(paragraphs): if not paragraph.strip(): continue # 段落输入 await self._type_paragraph(paragraph) # 段落间添加换行和思考时间 if i < len(paragraphs) - 1: # 段落结束,停顿思考 await asyncio.sleep(random.uniform(0.5, 1.0)) # 输入两个换行 await self.page.keyboard.press("Enter") await asyncio.sleep(random.uniform(0.1, 0.2)) await self.page.keyboard.press("Enter") # 准备输入下一段 await asyncio.sleep(random.uniform(0.8, 1.5)) return True except Exception as e: print(f"输入文本时出错: {e}") return False def _split_text_into_chunks(self, text: str) -> list: """使用结巴分词进行智能分词""" chunks = [] # 使用结巴词性标注 words = pseg.cut(text) current_chunk = "" for word, flag in words: # 处理标点符号 if flag == 'x': if current_chunk: chunks.append(current_chunk) chunks.append(word) current_chunk = "" continue # 处理空格 if word.isspace(): if current_chunk: chunks.append(current_chunk) chunks.append(word) current_chunk = "" continue # 处理表情符号和特殊字符 if re.match(r'[^\u4e00-\u9fff\w\s]', word): if current_chunk: chunks.append(current_chunk) chunks.append(word) current_chunk = "" continue # 根据词性决定是否需要独立成块 if flag in ['n', 'v', 'a']: # 名词、动词、形容词 # 如果当前词较长,可能需要再次切分 if len(word) > 3: sub_chunks = self._split_long_word(word) if current_chunk: chunks.append(current_chunk) chunks.extend(sub_chunks) current_chunk = "" else: if current_chunk: chunks.append(current_chunk) chunks.append(word) current_chunk = "" else: # 对于其他词性,累积到当前块 current_chunk += word # 如果累积的块太长,进行切分 if len(current_chunk) >= 3: chunks.append(current_chunk) current_chunk = "" # 添加最后剩余的块 if current_chunk: chunks.append(current_chunk) return chunks def _split_long_word(self, word: str) -> List[str]: """处理长词的切分""" result = [] temp = "" for char in word: temp += char if len(temp) == 2: # 按双字切分 result.append(temp) temp = "" if temp: # 处理剩余字符 result.append(temp) return result def _update_state(self, chars_typed: int = 1): """更新状态""" current_time = time.time() # 更新连续输入时间 if current_time - self.state['last_break_time'] > 5: # 如果超过5秒没有长停顿 self.state['continuous_typing'] += chars_typed # 更新疲劳度 fatigue_increase = chars_typed / self.config['fatigue_threshold'] self.state['fatigue'] = min(1.0, self.state['fatigue'] + fatigue_increase) # 更新注意力 if self.state['continuous_typing'] > 100: # 连续输入超过100个字符 self.state['attention'] *= 0.95 # 注意力下降 # 记录字符数 self.state['chars_typed'] += chars_typed # 检查是否需要休息 if self.state['fatigue'] > 0.7 or self.state['attention'] < 0.5: return True return False def _take_break(self): """模拟休息""" self.state['fatigue'] *= 0.5 # 疲劳度减半 self.state['attention'] = min(1.0, self.state['attention'] * 1.5) # 注意力恢复 self.state['continuous_typing'] = 0 # 重置连续输入 self.state['last_break_time'] = time.time() # 更新休息时间 def _get_current_error_rate(self) -> float: """获取当前错误率""" base_rate = self.config['error_rate_base'] fatigue_factor = self.state['fatigue'] * (self.config['error_rate_max'] - base_rate) attention_factor = (1 - self.state['attention']) * 0.05 return min(self.config['error_rate_max'], base_rate + fatigue_factor + attention_factor) async def _simulate_error(self, char: str): """模拟输入错误""" # 随机选择一个错误字符 wrong_chars = '的地得了着过去来到和与及' if '\u4e00' <= char <= '\u9fff' else 'asdfjkl;' wrong_char = random.choice(wrong_chars) # 输入错误字符 await self.page.keyboard.type(wrong_char) await asyncio.sleep(random.uniform(0.2, 0.5)) # 察觉错误的时间 # 删除错误字符 await self.page.keyboard.press('Backspace') await asyncio.sleep(random.uniform(0.1, 0.3)) # 删除后的停顿 # 输入正确字符 await self.page.keyboard.type(char) async def _simulate_distraction(self): """模拟轻微分心""" distraction_time = random.uniform(0.8, 1.5) # 减少分心时间 await asyncio.sleep(distraction_time) self._take_break() # 分心也算是一种休息 async def _type_paragraph(self, paragraph: str): """优化的段落输入方法""" # 将段落分割成词语块 chunks = self._split_text_into_chunks(paragraph) # 计算语义块(通常是3-4个词语一组) semantic_chunks = [] current_semantic = [] word_count = 0 for chunk in chunks: current_semantic.append(chunk) if chunk in ',。!?、;:': # 遇到标点就是一个语义块的结束 semantic_chunks.append(current_semantic) current_semantic = [] word_count = 0 else: word_count += 1 if word_count >= random.randint(2, 3): # 2-3个词语组成一个语义块 semantic_chunks.append(current_semantic) current_semantic = [] word_count = 0 if current_semantic: semantic_chunks.append(current_semantic) # 输入每个语义块 for semantic_block in semantic_chunks: # 语义块之前可能停顿思考 if random.random() < self.config['natural_pause']: await asyncio.sleep(random.uniform(*self.config['thought_pause'])) # 输入语义块中的每个词语 for chunk in semantic_block: # 检查疲劳状态 if self._update_state(len(chunk)): # 需要短暂休息 await asyncio.sleep(random.uniform(0.5, 1.0)) # 减少休息时间 self._take_break() # 检查是否轻微分心 if random.random() < self.config['distraction_probability'] and len(chunk) > 2: await self._simulate_distraction() # 词语输入 for char in chunk: # 检查是否出错(只在疲劳时更容易出错) if self.state['fatigue'] > 0.6: # 只有疲劳时才容易出错 current_error_rate = self._get_current_error_rate() if random.random() < current_error_rate: await self._simulate_error(char) continue # 正常字符输入 if char in ',。!?、;:': # 标点符号输入 delay = random.randint(*self.config['punct_delay']) # 疲劳会增加延迟 delay = int(delay * (1 + self.state['fatigue'] * 0.5)) await self.page.keyboard.type(char, delay=delay) # 标点符号后一定停顿 await asyncio.sleep(random.uniform(*self.config['word_pause'])) else: # 普通字符输入 delay = random.randint(*self.config['char_delay']) # 疲劳和注意力影响输入速度 delay = int(delay * (1 + self.state['fatigue'] * 0.5 - self.state['attention'] * 0.2)) await self.page.keyboard.type(char, delay=delay) # 更新状态 self._update_state() # 词语间停顿 pause_time = random.uniform(*self.config['word_pause']) # 疲劳会增加停顿时间 pause_time *= (1 + self.state['fatigue'] * 0.3) await asyncio.sleep(pause_time) # 语义块之间的停顿 pause_time = random.uniform(*self.config['chunk_pause']) # 疲劳和注意力影响停顿时间 pause_time *= (1 + self.state['fatigue'] * 0.5 - self.state['attention'] * 0.2) await asyncio.sleep(pause_time) async def _prepare_input(self, selector: str): """准备输入""" try: element = await self.page.wait_for_selector(selector, timeout=5000) await element.click() await asyncio.sleep(random.uniform(0.3, 0.8)) except Exception as e: print(f"准备输入失败: {e}") raise class OptimizedXHSTyping: """优化的小红书输入模拟器""" def __init__(self, page): self.page = page self.typing_config = { 'char_delay': (100, 200), # 基础字符延迟 'punct_delay': (200, 300), # 标点符号延迟 'paragraph_pause': (0.5, 1.0), # 段落停顿 'natural_pause': 0.05 # 自然停顿概率 } async def type_text(self, text: str): paragraphs = text.split('\n\n') for i, para in enumerate(paragraphs): # 段落输入 await self._type_paragraph(para) # 段落间自然停顿 if i < len(paragraphs) - 1: await asyncio.sleep(random.uniform(*self.typing_config['paragraph_pause'])) async def _type_paragraph(self, paragraph: str): char_count = 0 for char in paragraph: # 随机自然停顿 if random.random() < self.typing_config['natural_pause']: await asyncio.sleep(random.uniform(0.2, 0.5)) # 字符输入 if char in ',。!?、;:': delay = random.randint(*self.typing_config['punct_delay']) else: delay = random.randint(*self.typing_config['char_delay']) await self.page.keyboard.type(char, delay=delay) char_count += 1 # 每20-30个字符后可能停顿 if char_count % random.randint(20, 30) == 0: await asyncio.sleep(random.uniform(0.1, 0.3)) class XHSEnhancedTyping(EnhancedHumanTypingSimulator): """小红书专用增强版输入模拟器""" def __init__(self, page=None): super().__init__(page) self.tag_mode = False async def type_text(self, text: str, selector: str = None) -> bool: """重写文本输入方法""" if self.tag_mode: # 标签模式下使用较慢的输入速度 self.base_config.update({ 'min_typing_speed': 5, 'max_typing_speed': 12 }) else: # 正常文本模式 self.base_config.update({ 'min_typing_speed': 8, 'max_typing_speed': 20 }) return await super().type_text(text, selector) async def handle_tag_input(self, tag: str): """标签输入处理""" self.tag_mode = True # 输入#号 await self.page.keyboard.press("Shift") await asyncio.sleep(random.uniform(0.1, 0.2)) await self.page.keyboard.press("3") await self.page.keyboard.up("Shift") # 输入标签文本 await self.type_text(tag) # 等待建议出现 await asyncio.sleep(random.uniform(0.3, 0.5)) # 70%概率选择建议 if random.random() < 0.7: try: suggestions = await self.page.query_selector_all('.suggestion-item') if suggestions: await random.choice(suggestions[:2]).click() await asyncio.sleep(random.uniform(0.2, 0.4)) self.tag_mode = False return except: pass # 如果没有选择建议,直接回车 await self.page.keyboard.press("Enter") await asyncio.sleep(random.uniform(0.2, 0.4)) self.tag_mode = False