From b954c8adcf2fc9f32fd9c10b32e003ec74f55ce6 Mon Sep 17 00:00:00 2001 From: sini_chen <3161534962@qq.com> Date: Mon, 20 Oct 2025 09:50:22 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E7=B1=BB=E4=BA=BA=E6=89=93?= =?UTF-8?q?=E5=AD=97=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- uploader/xiaohongshu_uploader/main.py | 632 +++++++++++--------------- utils/enhanced_human_typing.py | 534 ++++++++++++++-------- 2 files changed, 622 insertions(+), 544 deletions(-) diff --git a/uploader/xiaohongshu_uploader/main.py b/uploader/xiaohongshu_uploader/main.py index 7f19555..0ab02fc 100644 --- a/uploader/xiaohongshu_uploader/main.py +++ b/uploader/xiaohongshu_uploader/main.py @@ -441,7 +441,7 @@ class XiaoHongShuVideo(object): class XiaoHongShuImage(object): - def __init__(self, title, image_paths, tags, publish_date: datetime, account_file, location=None, content=None, headless=True): + def __init__(self, title, image_paths, tags, publish_date: datetime, account_file, location=None, content=None, headless=True, use_enhanced_typing=True): self.title = title # 图文标题 self.image_paths = image_paths if isinstance(image_paths, list) else [image_paths] # 支持单张或多张图片 self.tags = tags @@ -452,27 +452,35 @@ class XiaoHongShuImage(object): self.date_format = '%Y年%m月%d日 %H:%M' self.local_executable_path = LOCAL_CHROME_PATH self.headless = headless + self.use_enhanced_typing = use_enhanced_typing # 是否使用增强版输入 async def set_schedule_time_xiaohongshu(self, page, publish_date): """设置定时发布时间""" - print(" [-] 正在设置定时发布时间...") - print(f"publish_date: {publish_date}") - - # 选择包含特定文本内容的 label 元素 - label_element = page.locator("label:has-text('定时发布')") - # 在选中的 label 元素下点击 checkbox - await label_element.click() - await asyncio.sleep(1) - publish_date_hour = publish_date.strftime("%Y-%m-%d %H:%M") - print(f"publish_date_hour: {publish_date_hour}") - - await asyncio.sleep(1) - await page.locator('.el-input__inner[placeholder="选择日期和时间"]').click() - await page.keyboard.press("Control+KeyA") - await page.keyboard.type(str(publish_date_hour)) - await page.keyboard.press("Enter") - - await asyncio.sleep(1) + xiaohongshu_logger.info(" [-] 正在设置定时发布时间...") + + try: + # 定位并点击定时发布复选框 + schedule_checkbox = await page.wait_for_selector('input[type="checkbox"]', timeout=3000) + await schedule_checkbox.click() + await asyncio.sleep(random.uniform(0.5, 1.0)) + + # 定位并点击时间输入框 + date_input = await page.wait_for_selector('input[placeholder="选择日期和时间"]', timeout=3000) + await date_input.click() + await asyncio.sleep(random.uniform(0.3, 0.5)) + + # 输入发布时间 + publish_date_str = publish_date.strftime("%Y-%m-%d %H:%M") + await date_input.fill(publish_date_str) + await asyncio.sleep(random.uniform(0.3, 0.5)) + await page.keyboard.press("Enter") + + xiaohongshu_logger.success(f" [-] 定时发布时间设置完成: {publish_date_str}") + return True + + except Exception as e: + xiaohongshu_logger.error(f" [-] 设置定时发布时间失败: {e}") + return False async def upload_images(self, page): """上传图片""" @@ -480,50 +488,26 @@ class XiaoHongShuImage(object): xiaohongshu_logger.info(f'[+] 正在上传图片,共{len(self.image_paths)}张') # 等待页面加载 - await asyncio.sleep(3) - - # 查找上传元素(简化选择器,移除详细日志) - upload_selectors = [ - "input[class='upload-input'][type='file'][multiple]", - "input[accept='.jpg,.jpeg,.png,.webp']", - "input[type='file'][multiple]", - "input[type='file']", - ] - - upload_input = None - for selector in upload_selectors: - try: - upload_input = await page.wait_for_selector(selector, timeout=3000) - if upload_input: - break - except: - continue - - if not upload_input: - # 尝试点击上传按钮 - try: - upload_button = await page.wait_for_selector('button:has-text("上传图片")', timeout=3000) - if not upload_button: - upload_button = await page.wait_for_selector('div:has-text("上传图片")', timeout=3000) - - if upload_button: - await upload_button.click() - await asyncio.sleep(2) - upload_input = await page.wait_for_selector("input[type='file']", timeout=3000) - - if not upload_input: - raise Exception("未找到图片上传元素") - except Exception as e: - raise Exception(f"图片上传失败: {e}") - - # 上传图片(显示文件名而不是完整路径) - file_names = [Path(p).name for p in self.image_paths] - xiaohongshu_logger.info(f" [-] 上传文件: {', '.join(file_names)}") - await upload_input.set_input_files(self.image_paths) - - # 等待上传完成 await asyncio.sleep(2) - await self.wait_for_images_upload_complete(page) + + try: + # 直接定位上传输入框 + upload_input = await page.wait_for_selector("input[type='file']", timeout=5000) + if not upload_input: + raise Exception("未找到图片上传元素") + + # 上传图片 + file_names = [Path(p).name for p in self.image_paths] + xiaohongshu_logger.info(f" [-] 上传文件: {', '.join(file_names)}") + await upload_input.set_input_files(self.image_paths) + + # 等待上传完成 + await asyncio.sleep(2) + await self.wait_for_images_upload_complete(page) + + except Exception as e: + xiaohongshu_logger.error(f"图片上传失败: {e}") + raise async def wait_for_images_upload_complete(self, page): """等待图片上传完成""" @@ -532,58 +516,39 @@ class XiaoHongShuImage(object): while wait_count < max_wait_time: try: - # 简化检查逻辑,移除详细日志 - # 检查添加按钮 - add_selectors = [ - 'div.entry:has-text("添加")', - 'div:has-text("添加")', - '[class*="add"]:has-text("添加")' - ] - - for selector in add_selectors: - try: - add_button = await page.query_selector(selector) - if add_button: - xiaohongshu_logger.success(" [-] 图片上传完成") - return - except: - continue - # 检查图片预览 - try: - images = await page.query_selector_all('img') - valid_images = [] - for img in images: - src = await img.get_attribute('src') - if src and ('data:image' in src or 'blob:' in src or len(src) > 50): - valid_images.append(img) - - if len(valid_images) >= len(self.image_paths): - xiaohongshu_logger.success(f" [-] 图片上传完成 ({len(valid_images)}张)") - await asyncio.sleep(2) - return - except: - pass + images = await page.query_selector_all('img') + valid_images = [img for img in images if await img.get_attribute('src')] - # 检查加载状态 - loading_elements = await page.query_selector_all('[class*="loading"], [class*="uploading"]') - if not loading_elements: - xiaohongshu_logger.success(" [-] 图片上传完成") + if len(valid_images) >= len(self.image_paths): + xiaohongshu_logger.success(f" [-] 图片上传完成 ({len(valid_images)}张)") + # 随机等待一小段时间确保图片完全加载 + await asyncio.sleep(random.uniform(1.5, 2.5)) return - # 减少日志频率:每15秒输出一次进度 - if wait_count % 15 == 0 and wait_count > 0: + # 检查是否还在上传 + loading = await page.query_selector('[class*="loading"], [class*="uploading"]') + if not loading: + # 再次检查图片数量 + images = await page.query_selector_all('img') + if len(images) >= len(self.image_paths): + xiaohongshu_logger.success(" [-] 图片上传完成") + await asyncio.sleep(random.uniform(1.0, 2.0)) + return + + # 每10秒输出一次进度 + if wait_count % 10 == 0: xiaohongshu_logger.info(f" [-] 等待图片上传... ({wait_count}/{max_wait_time}秒)") - await asyncio.sleep(3) - wait_count += 3 + await asyncio.sleep(random.uniform(2.0, 3.0)) + wait_count += 2 except Exception as e: - xiaohongshu_logger.debug(f" [-] 检查上传状态出错: {e}") - await asyncio.sleep(3) - wait_count += 3 + xiaohongshu_logger.error(f" [-] 检查上传状态出错: {e}") + await asyncio.sleep(random.uniform(1.0, 2.0)) + wait_count += 2 - xiaohongshu_logger.warning(" [-] 图片上传等待超时,继续流程") + raise Exception("图片上传超时") async def locate_content_editor(self, page): """定位正文编辑区域""" @@ -615,105 +580,78 @@ class XiaoHongShuImage(object): """填充标题和内容""" xiaohongshu_logger.info(f' [-] 正在填充标题和话题...') - # 使用传入的人类化输入包装器(避免重复创建) + # 等待页面加载 + await asyncio.sleep(2) # 填充标题 title_container = page.locator('div.plugin.title-container').locator('input.d-text') if await title_container.count(): # 使用人类化输入填充标题 - success = await human_typer.type_text_human( - 'div.plugin.title-container input.d-text', - self.title[:30], - clear_first=True - ) + await title_container.click() + await asyncio.sleep(0.5) + await page.keyboard.press("Control+A") + await page.keyboard.press("Delete") + await asyncio.sleep(0.3) - if not success: - xiaohongshu_logger.warning("标题人类化输入失败,使用传统方式") - await title_container.fill(self.title[:30]) + # 使用视频上传中的标题输入方式 + for char in self.title[:30]: + await page.keyboard.type(char, delay=random.randint(100, 200)) + await asyncio.sleep(random.uniform(0.05, 0.15)) + + await asyncio.sleep(0.5) else: - # 使用人类化输入的备用方案 - success = await human_typer.type_text_human(".notranslate", self.title, clear_first=True) - if not success: - xiaohongshu_logger.warning("标题人类化输入失败,使用传统方式") - titlecontainer = page.locator(".notranslate") - await titlecontainer.click() - await page.keyboard.press("Backspace") - await page.keyboard.press("Control+KeyA") - await page.keyboard.press("Delete") - await page.keyboard.type(self.title) - await page.keyboard.press("Enter") + # 使用备用方案 + titlecontainer = page.locator(".notranslate") + await titlecontainer.click() + await asyncio.sleep(0.5) + await page.keyboard.press("Control+A") + await page.keyboard.press("Delete") + await asyncio.sleep(0.3) + + for char in self.title: + await page.keyboard.type(char, delay=random.randint(100, 200)) + await asyncio.sleep(random.uniform(0.05, 0.15)) + + await asyncio.sleep(0.5) # 定位正文编辑区域 content_element, css_selector = await self.locate_content_editor(page) - # 🔧 创建专门用于正文输入的人类化输入包装器 - from utils.human_typing_wrapper import HumanTypingWrapper - - # 根据正文长度调整输入速度配置 - content_length = len(self.content) if self.content else len(self.title) + 2 - - # 为长文本使用更慢的输入速度,提高真实性 - if content_length > 100: - # 长文本:更慢更谨慎 - content_config = { - 'min_delay': 80, # 最小延迟80ms - 'max_delay': 200, # 最大延迟200ms - 'pause_probability': 0.15, # 15%概率暂停思考 - 'pause_min': 800, # 暂停最少800ms - 'pause_max': 2000, # 暂停最多2秒 - 'correction_probability': 0.02, # 2%概率打错字 - 'backspace_probability': 0.01, # 1%概率退格重输 - } - xiaohongshu_logger.info(f" [-] 长文本模式 ({content_length}字符),使用慢速人类化输入") - else: - # 短文本:相对较快但仍然人类化 - content_config = { - 'min_delay': 60, # 最小延迟60ms - 'max_delay': 150, # 最大延迟150ms - 'pause_probability': 0.1, # 10%概率暂停 - 'pause_min': 500, # 暂停最少500ms - 'pause_max': 1200, # 暂停最多1.2秒 - 'correction_probability': 0.01, # 1%概率打错字 - 'backspace_probability': 0.005, # 0.5%概率退格 - } - xiaohongshu_logger.info(f" [-] 短文本模式 ({content_length}字符),使用标准人类化输入") - - # 创建专门的正文输入器 - content_typer = HumanTypingWrapper(page, content_config) - # 准备正文内容 if self.content: - # 如果有自定义正文内容,使用自定义内容 content_text = self.content xiaohongshu_logger.info(f" [-] 使用自定义正文内容,长度: {len(content_text)} 字符") else: - # 如果没有自定义内容,使用标题作为开头 content_text = f"{self.title}\n\n" xiaohongshu_logger.info(" [-] 使用默认正文内容(标题)") - - # 🔧 使用优化的人类化输入正文 - xiaohongshu_logger.info(f" [-] 开始人类化输入正文内容...") - - # 对于长文本,分段输入更加真实 - if content_length > 200: - xiaohongshu_logger.info(" [-] 长文本分段输入模式") - success = await self._input_long_content_in_segments(page, content_typer, css_selector, content_text) - else: - # 短文本直接输入 - success = await content_typer.type_text_human( - css_selector, - content_text, - clear_first=True - ) - - if not success: - xiaohongshu_logger.warning(" [-] 正文人类化输入失败,使用传统方式") - await content_element.click() - await asyncio.sleep(0.5) # 点击后稍作等待 + + try: + # 使用增强版人类输入模拟器 + from utils.enhanced_human_typing import EnhancedHumanTypingSimulator + human_typer = EnhancedHumanTypingSimulator(page) - # 传统方式也要模拟人类输入速度 - xiaohongshu_logger.info(" [-] 使用传统方式进行人类化输入...") - await self._fallback_human_typing(page, content_text) + # 输入正文内容 + success = await human_typer.type_text(content_text, css_selector) + + if not success: + xiaohongshu_logger.error(" [-] 增强版输入失败,尝试使用备用方案") + # 点击并清空输入区域 + await content_element.click() + await asyncio.sleep(random.uniform(0.3, 0.5)) + await page.keyboard.press("Control+A") + await page.keyboard.press("Delete") + await asyncio.sleep(random.uniform(0.2, 0.4)) + + # 使用简单的输入方式 + for char in content_text: + await page.keyboard.type(char, delay=random.randint(100, 200)) + await asyncio.sleep(random.uniform(0.05, 0.1)) + + xiaohongshu_logger.success(f" [-] 正文输入完成,共 {len(content_text)} 字符") + + except Exception as e: + xiaohongshu_logger.error(f" [-] 正文输入失败: {e}") + raise xiaohongshu_logger.success(f" [-] 正文输入完成,共 {len(content_text)} 字符") @@ -732,50 +670,72 @@ class XiaoHongShuImage(object): # 添加两个换行,将标签与正文分开 await page.keyboard.press("Enter") await page.keyboard.press("Enter") - await asyncio.sleep(0.3) - - # 标签输入(参考视频标签添加方式) + await asyncio.sleep(0.5) + + # 输入标签 xiaohongshu_logger.info(f" [-] 开始输入标签 ({len(self.tags)}个)...") - # 创建标签输入器 + # 创建专门用于慢速标签输入的人类化输入包装器 from utils.human_typing_wrapper import HumanTypingWrapper - tag_config = { - 'min_delay': 400, 'max_delay': 700, 'pause_probability': 0.25, - 'pause_min': 400, 'pause_max': 1000, 'correction_probability': 0.02, - 'backspace_probability': 0.01, - } - tag_typer = HumanTypingWrapper(page, tag_config) - # 输入标签(简化日志) + slow_config = { + 'min_delay': 500, # 最小延迟150ms(更慢) + 'max_delay': 800, # 最大延迟300ms + 'pause_probability': 0.3, # 30%概率暂停 + 'pause_min': 500, # 暂停最少500ms + 'pause_max': 1200, # 暂停最多1200ms + 'correction_probability': 0.0, # 禁用错误修正 + 'backspace_probability': 0.0, # 禁用退格重输 + } + + # 创建专门的慢速输入器 + slow_typer = HumanTypingWrapper(page, slow_config) + + # 逐个标签输入,每个标签后都有停顿 success = True for i, tag in enumerate(self.tags): - # 标签间思考时间 - if i > 0: - import random - await asyncio.sleep(random.uniform(0.8, 1.5)) + tag_text = f"#{tag}" - # 输入标签 - tag_success = await tag_typer.type_text_human( - css_selector, f"#{tag}", clear_first=False + # 输入标签文本(使用慢速配置) + # 先输入#号(需要按Shift+3) + await page.keyboard.press("Shift") + await asyncio.sleep(random.uniform(0.1, 0.2)) + await page.keyboard.press("Digit3") + await page.keyboard.up("Shift") + await asyncio.sleep(random.uniform(0.2, 0.4)) + + # 输入标签内容 + tag_success = await slow_typer.type_text_human( + css_selector, + tag, + clear_first=False ) if not tag_success: success = False break + + # 输入换行符并添加停顿 + await page.keyboard.press("Enter") + await page.wait_for_timeout(800) # 换行后停顿800ms - # 处理标签建议 - await self._handle_tag_suggestions_after_input(page, tag) - - # 标签间分隔 - if i < len(self.tags) - 1: - await page.keyboard.type(" ") - import random - await asyncio.sleep(random.uniform(0.2, 0.5)) + xiaohongshu_logger.info(f"已输入标签: {tag} ({i+1}/{len(self.tags)})") - # 备用输入方式 if not success: - xiaohongshu_logger.warning(" [-] 使用备用标签输入方式") - await self._fallback_tag_input(page, css_selector) + xiaohongshu_logger.warning("标签人类化输入失败,使用传统方式") + await page.click(css_selector) + for index, tag in enumerate(self.tags, start=1): + # 输入#号(需要按Shift+3) + await page.keyboard.press("Shift") + await asyncio.sleep(random.uniform(0.1, 0.2)) + await page.keyboard.press("Digit3") + await page.keyboard.up("Shift") + await asyncio.sleep(random.uniform(0.2, 0.4)) + + for char in tag: + await page.keyboard.type(char, delay=500) + await page.wait_for_timeout(1000) + await page.keyboard.press("Enter") xiaohongshu_logger.success(f' [-] 标签输入完成 ({len(self.tags)}个)') @@ -856,9 +816,19 @@ class XiaoHongShuImage(object): # 段落间添加换行和思考时间 if i < len(paragraphs): + # 先停顿一下,模拟思考下一段内容 + await asyncio.sleep(random.uniform(2.0, 4.0)) + + # 输入第一个换行,短暂停顿 await page.keyboard.press("Enter") + await asyncio.sleep(random.uniform(0.3, 0.6)) + + # 输入第二个换行,再次短暂停顿 await page.keyboard.press("Enter") - await asyncio.sleep(random.uniform(1.0, 3.0)) + await asyncio.sleep(random.uniform(0.5, 1.0)) + + # 段落间再次思考 + await asyncio.sleep(random.uniform(1.5, 3.0)) xiaohongshu_logger.success(" [-] 分段输入完成") return True @@ -1135,64 +1105,24 @@ class XiaoHongShuImage(object): xiaohongshu_logger.info(f" [-] 开始设置地理位置: {location}") try: - # 1. 点击地点输入框 - xiaohongshu_logger.info(" [-] 点击地点输入框...") - selectors = [ - 'div.d-select--color-text-title--color-bg-fill', - 'div.d-text.d-select-placeholder.d-text-ellipsis.d-text-nowrap', - 'div[class*="d-select"]' - ] + # 定位并点击地点选择框 + location_input = await page.wait_for_selector('div[class*="d-select"]', timeout=3000) + await location_input.click() + await asyncio.sleep(random.uniform(0.3, 0.5)) - clicked = False - for selector in selectors: - try: - element = await page.wait_for_selector(selector, timeout=3000) - await element.click() - clicked = True - break - except: - continue - - if not clicked: - xiaohongshu_logger.error(" [-] 未找到地点输入框") - return False - - # 2. 输入地点名称 - xiaohongshu_logger.info(f" [-] 输入地点名称: {location}") - await page.keyboard.press("Control+a") + # 输入地点名称 await page.keyboard.type(location) - await asyncio.sleep(2) # 等待下拉选项加载 + await asyncio.sleep(random.uniform(1.0, 1.5)) # 等待下拉列表加载 - # 3. 选择匹配的地点选项 - xiaohongshu_logger.info(" [-] 查找匹配的地点选项...") + # 选择第一个匹配的选项 + option = await page.wait_for_selector(f'div[class*="name"]:has-text("{location}")', timeout=3000) + if option: + await option.click() + xiaohongshu_logger.success(f" [-] 成功选择地点: {location}") + return True - # 尝试多种选择器找到包含地点名称的选项 - option_selectors = [ - f'//div[contains(@class, "name") and contains(text(), "{location}")]', - f'//div[contains(text(), "{location}市")]', - f'//div[contains(text(), "{location}")]' - ] - - selected = False - for selector in option_selectors: - try: - options = await page.query_selector_all(selector) - if options: - # 选择第一个匹配的选项 - option = options[0] - option_text = await option.inner_text() - await option.click() - xiaohongshu_logger.success(f" [-] 成功选择地点: {option_text}") - selected = True - break - except: - continue - - if not selected: - xiaohongshu_logger.warning(f" [-] 未找到匹配的地点选项: {location}") - return False - - return True + xiaohongshu_logger.warning(f" [-] 未找到匹配的地点选项: {location}") + return False except Exception as e: xiaohongshu_logger.error(f" [-] 设置地理位置失败: {e}") @@ -1200,118 +1130,98 @@ class XiaoHongShuImage(object): async def upload(self, playwright: Playwright) -> None: """主要的上传流程""" - # 🔧 使用增强的反检测浏览器配置 - from utils.anti_detection import AntiDetectionConfig - import random - - # 反检测浏览器参数 - browser_args = AntiDetectionConfig.STANDARD_BROWSER_ARGS.copy() - # 使用 Chromium 浏览器启动一个浏览器实例 if self.local_executable_path: browser = await playwright.chromium.launch( headless=self.headless, - executable_path=self.local_executable_path, - args=browser_args # 🔧 添加反检测参数 + executable_path=self.local_executable_path ) else: browser = await playwright.chromium.launch( - headless=self.headless, - args=browser_args # 🔧 添加反检测参数 + headless=self.headless ) - - # 🔧 创建增强的浏览器上下文 - context_options = { - "storage_state": f"{self.account_file}", - "locale": "zh-CN", - "timezone_id": "Asia/Shanghai" - } - # 🔧 为无头模式添加完整的反检测设置 - if self.headless: - context_options.update({ - 'viewport': {'width': 1920, 'height': 1080}, # 🔧 使用文档建议的分辨率 - 'device_scale_factor': 1, - 'has_touch': False, - 'is_mobile': False - }) - - # 使用随机用户代理 - user_agent = random.choice(AntiDetectionConfig.REAL_USER_AGENTS) - context_options["user_agent"] = user_agent - xiaohongshu_logger.info(f" [-] 无头模式设置: 1920x1080") - xiaohongshu_logger.info(f" [-] 使用用户代理: {user_agent[:50]}...") - else: - # 有头模式使用较小的窗口 - context_options["viewport"] = {"width": 1600, "height": 900} - xiaohongshu_logger.info(f" [-] 有头模式设置: 1600x900") - - context = await browser.new_context(**context_options) + # 创建一个浏览器上下文,使用基本配置 + context = await browser.new_context( + viewport={"width": 1600, "height": 900}, + storage_state=f"{self.account_file}" + ) context = await set_init_script(context) # 创建一个新的页面 page = await context.new_page() # 🔧 创建人类化输入包装器(关键修复) - human_typer = create_human_typer(page) - xiaohongshu_logger.info(" [-] 已创建人类化输入包装器") - - # 直接访问小红书图文发布页面 - await page.goto("https://creator.xiaohongshu.com/publish/publish?from=tab_switch&target=image") - xiaohongshu_logger.info(f'[+]正在上传图文-------{self.title}') - - # 等待页面加载 - xiaohongshu_logger.info(f'[-] 正在打开图文发布页面...') - await page.wait_for_url("https://creator.xiaohongshu.com/publish/publish*") - - # 上传图片 - await self.upload_images(page) - - # 填充内容(传递人类化输入包装器) - await self.fill_content(page, human_typer) - - # 设置位置(如果有指定地点) - if self.location and self.location.strip(): - xiaohongshu_logger.info(f" [-] 开始设置地理位置: {self.location}") - await self.set_location(page, self.location) + if self.use_enhanced_typing: + from utils.enhanced_human_typing import EnhancedHumanTypingSimulator + human_typer = EnhancedHumanTypingSimulator(page) + xiaohongshu_logger.info(" [-] 已创建增强版人类化输入模拟器") else: - xiaohongshu_logger.info(" [-] 未指定地点或地点为空,跳过位置设置") + human_typer = create_human_typer(page) + xiaohongshu_logger.info(" [-] 已创建标准人类化输入包装器") - # 设置定时发布(如果需要) - if self.publish_date != 0: - await self.set_schedule_time_xiaohongshu(page, self.publish_date) + try: + # 直接访问小红书图文发布页面 + await page.goto("https://creator.xiaohongshu.com/publish/publish?from=tab_switch&target=image") + xiaohongshu_logger.info(f'[+]正在上传图文-------{self.title}') + + # 等待页面加载 + xiaohongshu_logger.info(f'[-] 正在打开图文发布页面...') + await page.wait_for_url("https://creator.xiaohongshu.com/publish/publish*") + await asyncio.sleep(2) # 等待页面完全加载 + + # 上传图片 + await self.upload_images(page) + + # 等待页面稳定 + await asyncio.sleep(3) # 增加等待时间,确保页面稳定 + + # 填充内容(传递人类化输入包装器) + await self.fill_content(page, human_typer) + + # 设置位置(如果有指定地点) + if self.location and self.location.strip(): + xiaohongshu_logger.info(f" [-] 开始设置地理位置: {self.location}") + await self.set_location(page, self.location) + else: + xiaohongshu_logger.info(" [-] 未指定地点或地点为空,跳过位置设置") + + # 设置定时发布(如果需要) + if self.publish_date != 0: + await self.set_schedule_time_xiaohongshu(page, self.publish_date) + except Exception as e: + xiaohongshu_logger.error(f"页面操作出错: {e}") + # 保存页面截图以便调试 + await page.screenshot(path="error_screenshot.png", full_page=True) + raise - # 发布图文(增强反检测等待策略) + # 发布图文 xiaohongshu_logger.info(" [-] 准备发布图文...") - await asyncio.sleep(1) # 发布前等待 + await asyncio.sleep(random.uniform(0.5, 1.0)) # 发布前等待 - while True: - try: - # 等待并点击发布按钮 - if self.publish_date != 0: - xiaohongshu_logger.info(" [-] 点击定时发布按钮...") - await page.locator('button:has-text("定时发布")').click() - else: - xiaohongshu_logger.info(" [-] 点击发布按钮...") - await page.locator('button:has-text("发布")').click() - - # 增加发布后的等待时间 - await asyncio.sleep(1) - - await page.wait_for_url( - "https://creator.xiaohongshu.com/publish/success?**", - timeout=5000 # 增加超时时间到5秒 - ) - xiaohongshu_logger.success(" [-]图文发布成功") - break - except Exception as e: - xiaohongshu_logger.info(" [-] 图文正在发布中...") - xiaohongshu_logger.debug(f" [-] 等待详情: {str(e)}") - await page.screenshot(full_page=True) - # 使用随机等待时间,模拟人类行为 - import random - wait_time = random.uniform(1.0, 2.0) # 1-2秒随机等待 - await asyncio.sleep(wait_time) + try: + # 定位并点击发布按钮 + button_text = "定时发布" if self.publish_date != 0 else "发布" + publish_button = await page.wait_for_selector(f'button:has-text("{button_text}")', timeout=3000) + + if not publish_button: + raise Exception(f"未找到{button_text}按钮") + + # 点击发布按钮 + await publish_button.click() + await asyncio.sleep(random.uniform(0.5, 1.0)) + + # 等待发布成功 + success_url = "https://creator.xiaohongshu.com/publish/success" + await page.wait_for_url(f"{success_url}?**", timeout=5000) + + xiaohongshu_logger.success(" [-] 图文发布成功") + + except Exception as e: + xiaohongshu_logger.error(f" [-] 发布失败: {e}") + # 保存错误截图 + await page.screenshot(path="publish_error.png", full_page=True) + raise # 保存cookie并关闭浏览器 await context.storage_state(path=self.account_file) diff --git a/utils/enhanced_human_typing.py b/utils/enhanced_human_typing.py index df7059b..50cce7d 100644 --- a/utils/enhanced_human_typing.py +++ b/utils/enhanced_human_typing.py @@ -2,64 +2,68 @@ import time import random import re import asyncio +import jieba +import jieba.posseg as pseg from typing import Dict, List, Optional +# 初始化结巴分词的词典 +jieba.initialize() + class EnhancedHumanTypingSimulator: def __init__(self, page=None): - # 保留原方案的简单配置 - self.base_config = { - 'min_typing_speed': 5, - 'max_typing_speed': 15, - 'pause_probability': 0.1, - 'chunk_input': True, - 'max_chunk_length': 50 - } - - # 新增高级特性配置 - self.advanced_config = { - # 人类状态模拟 - 'energy_level': random.uniform(0.7, 1.0), - 'typing_proficiency': random.uniform(0.6, 0.9), - 'emotion_state': random.uniform(0.8, 1.0), - - # 错误处理 - 'base_error_rate': random.uniform(0.02, 0.05), - 'error_correction_speed': random.uniform(0.3, 0.8), - - # 速度控制 - 'speed_variance': random.uniform(0.1, 0.2), - 'burst_speed_probability': 0.1 - } - self.page = page - self.typing_session = { - 'start_time': None, - 'chars_typed': 0, - 'last_break_time': time.time() + # 优化配置管理 + self.config = { + 'char_delay': (80, 150), # 减少基础字符延迟 + 'punct_delay': (150, 250), # 减少标点符号延迟 + 'paragraph_pause': (0.5, 1.0), # 减少段落停顿 + 'natural_pause': 0.08, # 降低自然停顿概率 + 'thought_pause': (0.2, 0.4), # 减少思考停顿时间 + 'word_pause': (0.1, 0.25), # 减少词语间停顿 + 'chunk_pause': (0.2, 0.4), # 减少语义块停顿 + 'char_count_pause': (25, 35), # 增加字符计数范围 + 'char_count_delay': (0.1, 0.3), # 减少字符计数停顿 + 'fatigue_threshold': 300, # 增加疲劳阈值 + 'error_rate_base': 0.01, # 降低基础错误率 + 'error_rate_max': 0.05, # 降低最大错误率 + 'distraction_probability': 0.02 # 降低分心概率 + } + + # 状态管理 + self.state = { + 'fatigue': 0.0, # 疲劳度 (0-1) + 'attention': 1.0, # 注意力 (0-1) + 'chars_typed': 0, # 已输入字符数 + 'last_break_time': 0, # 上次休息时间 + 'continuous_typing': 0 # 连续输入时间 } async def type_text(self, text: str, selector: str = None) -> bool: - """增强版的文本输入方法""" + """优化的文本输入方法""" try: if selector: - # 等待并点击元素 await self._prepare_input(selector) - # 初始化会话 - self.typing_session['start_time'] = time.time() + # 简单分段 + paragraphs = text.split('\n\n') - # 智能分段 - chunks = self._smart_split_text(text) - - for chunk in chunks: - # 获取当前状态 - current_state = self._get_current_state() + for i, paragraph in enumerate(paragraphs): + if not paragraph.strip(): + continue + + # 段落输入 + await self._type_paragraph(paragraph) - # 输入当前段落 - await self._type_chunk(chunk, current_state) - - # 段落间自然停顿 - await self._natural_pause(current_state) + # 段落间添加换行和思考时间 + if i < len(paragraphs) - 1: + # 段落结束,停顿思考 + await asyncio.sleep(random.uniform(0.5, 1.0)) + # 输入两个换行 + await self.page.keyboard.press("Enter") + await asyncio.sleep(random.uniform(0.1, 0.2)) + await self.page.keyboard.press("Enter") + # 准备输入下一段 + await asyncio.sleep(random.uniform(0.8, 1.5)) return True @@ -67,166 +71,330 @@ class EnhancedHumanTypingSimulator: print(f"输入文本时出错: {e}") return False - def _smart_split_text(self, text: str) -> List[str]: - """智能文本分段""" - paragraphs = text.split('\n') + def _split_text_into_chunks(self, text: str) -> list: + """使用结巴分词进行智能分词""" chunks = [] - for para in paragraphs: - if len(para) <= self.base_config['max_chunk_length']: - if para.strip(): - chunks.append(para) + # 使用结巴词性标注 + words = pseg.cut(text) + + current_chunk = "" + for word, flag in words: + # 处理标点符号 + if flag == 'x': + if current_chunk: + chunks.append(current_chunk) + chunks.append(word) + current_chunk = "" + continue + + # 处理空格 + if word.isspace(): + if current_chunk: + chunks.append(current_chunk) + chunks.append(word) + current_chunk = "" continue - sentences = re.split(r'([。!?,:;])', para) - current_chunk = '' + # 处理表情符号和特殊字符 + if re.match(r'[^\u4e00-\u9fff\w\s]', word): + if current_chunk: + chunks.append(current_chunk) + chunks.append(word) + current_chunk = "" + continue - for sent in sentences: - if len(current_chunk) + len(sent) < self.base_config['max_chunk_length']: - current_chunk += sent - else: - if current_chunk.strip(): + # 根据词性决定是否需要独立成块 + if flag in ['n', 'v', 'a']: # 名词、动词、形容词 + # 如果当前词较长,可能需要再次切分 + if len(word) > 3: + sub_chunks = self._split_long_word(word) + if current_chunk: chunks.append(current_chunk) - current_chunk = sent - - if current_chunk.strip(): + chunks.extend(sub_chunks) + current_chunk = "" + else: + if current_chunk: + chunks.append(current_chunk) + chunks.append(word) + current_chunk = "" + else: + # 对于其他词性,累积到当前块 + current_chunk += word + # 如果累积的块太长,进行切分 + if len(current_chunk) >= 3: + chunks.append(current_chunk) + current_chunk = "" + + # 添加最后剩余的块 + if current_chunk: chunks.append(current_chunk) return chunks - def _get_current_state(self) -> Dict: - """获取当前输入状态""" - typing_duration = time.time() - self.typing_session['start_time'] - fatigue = min(typing_duration / 300, 0.7) - - self.advanced_config['energy_level'] *= (1 - fatigue * 0.1) - self.advanced_config['emotion_state'] *= random.uniform(0.98, 1.02) - - return { - 'energy_level': max(0.3, self.advanced_config['energy_level']), - 'emotion_state': max(0.4, min(1.0, self.advanced_config['emotion_state'])), - 'typing_proficiency': self.advanced_config['typing_proficiency'], - 'current_error_rate': self._calculate_error_rate(fatigue) - } + def _split_long_word(self, word: str) -> List[str]: + """处理长词的切分""" + result = [] + temp = "" + for char in word: + temp += char + if len(temp) == 2: # 按双字切分 + result.append(temp) + temp = "" + if temp: # 处理剩余字符 + result.append(temp) + return result - async def _type_chunk(self, chunk: str, state: Dict): - """输入文本块""" - for char in chunk: - typing_speed = self._calculate_typing_speed(state) - - if random.random() < state['current_error_rate']: - await self._handle_typing_error(char, state) + def _update_state(self, chars_typed: int = 1): + """更新状态""" + current_time = time.time() + + # 更新连续输入时间 + if current_time - self.state['last_break_time'] > 5: # 如果超过5秒没有长停顿 + self.state['continuous_typing'] += chars_typed + + # 更新疲劳度 + fatigue_increase = chars_typed / self.config['fatigue_threshold'] + self.state['fatigue'] = min(1.0, self.state['fatigue'] + fatigue_increase) + + # 更新注意力 + if self.state['continuous_typing'] > 100: # 连续输入超过100个字符 + self.state['attention'] *= 0.95 # 注意力下降 + + # 记录字符数 + self.state['chars_typed'] += chars_typed + + # 检查是否需要休息 + if self.state['fatigue'] > 0.7 or self.state['attention'] < 0.5: + return True + return False + + def _take_break(self): + """模拟休息""" + self.state['fatigue'] *= 0.5 # 疲劳度减半 + self.state['attention'] = min(1.0, self.state['attention'] * 1.5) # 注意力恢复 + self.state['continuous_typing'] = 0 # 重置连续输入 + self.state['last_break_time'] = time.time() # 更新休息时间 + + def _get_current_error_rate(self) -> float: + """获取当前错误率""" + base_rate = self.config['error_rate_base'] + fatigue_factor = self.state['fatigue'] * (self.config['error_rate_max'] - base_rate) + attention_factor = (1 - self.state['attention']) * 0.05 + return min(self.config['error_rate_max'], base_rate + fatigue_factor + attention_factor) + + async def _simulate_error(self, char: str): + """模拟输入错误""" + # 随机选择一个错误字符 + wrong_chars = '的地得了着过去来到和与及' if '\u4e00' <= char <= '\u9fff' else 'asdfjkl;' + wrong_char = random.choice(wrong_chars) + + # 输入错误字符 + await self.page.keyboard.type(wrong_char) + await asyncio.sleep(random.uniform(0.2, 0.5)) # 察觉错误的时间 + + # 删除错误字符 + await self.page.keyboard.press('Backspace') + await asyncio.sleep(random.uniform(0.1, 0.3)) # 删除后的停顿 + + # 输入正确字符 + await self.page.keyboard.type(char) + + async def _simulate_distraction(self): + """模拟轻微分心""" + distraction_time = random.uniform(0.8, 1.5) # 减少分心时间 + await asyncio.sleep(distraction_time) + self._take_break() # 分心也算是一种休息 + + async def _type_paragraph(self, paragraph: str): + """优化的段落输入方法""" + # 将段落分割成词语块 + chunks = self._split_text_into_chunks(paragraph) + + # 计算语义块(通常是3-4个词语一组) + semantic_chunks = [] + current_semantic = [] + word_count = 0 + + for chunk in chunks: + current_semantic.append(chunk) + if chunk in ',。!?、;:': # 遇到标点就是一个语义块的结束 + semantic_chunks.append(current_semantic) + current_semantic = [] + word_count = 0 else: - await self._type_char(char, typing_speed) + word_count += 1 + if word_count >= random.randint(2, 3): # 2-3个词语组成一个语义块 + semantic_chunks.append(current_semantic) + current_semantic = [] + word_count = 0 + + if current_semantic: + semantic_chunks.append(current_semantic) + + # 输入每个语义块 + for semantic_block in semantic_chunks: + # 语义块之前可能停顿思考 + if random.random() < self.config['natural_pause']: + await asyncio.sleep(random.uniform(*self.config['thought_pause'])) - self.typing_session['chars_typed'] += 1 - await self._micro_pause(state) - - def _calculate_typing_speed(self, state: Dict) -> float: - """计算实时打字速度""" - base_speed = random.uniform( - self.base_config['min_typing_speed'], - self.base_config['max_typing_speed'] - ) - - speed = base_speed * ( - 0.7 + state['energy_level'] * 0.3 + - state['emotion_state'] * 0.2 + - state['typing_proficiency'] * 0.3 - ) - - speed *= random.uniform( - 1 - self.advanced_config['speed_variance'], - 1 + self.advanced_config['speed_variance'] - ) - - return speed - - def _calculate_error_rate(self, fatigue: float) -> float: - """计算当前错误率""" - base_rate = self.advanced_config['base_error_rate'] - error_rate = base_rate * (1 + fatigue) - error_rate *= random.uniform(0.8, 1.2) - return min(error_rate, 0.15) - - async def _handle_typing_error(self, char: str, state: Dict): - """处理打字错误""" - error_types = ['typo', 'double_hit', 'delay'] - error_type = random.choice(error_types) - - if error_type == 'typo': - wrong_char = self._get_similar_char(char) - await self._type_char(wrong_char, self._calculate_typing_speed(state)) - await asyncio.sleep(random.uniform(0.2, 0.5)) - await self._press_key("Backspace") - await self._type_char(char, self._calculate_typing_speed(state)) + # 输入语义块中的每个词语 + for chunk in semantic_block: + # 检查疲劳状态 + if self._update_state(len(chunk)): + # 需要短暂休息 + await asyncio.sleep(random.uniform(0.5, 1.0)) # 减少休息时间 + self._take_break() + + # 检查是否轻微分心 + if random.random() < self.config['distraction_probability'] and len(chunk) > 2: + await self._simulate_distraction() + + # 词语输入 + for char in chunk: + # 检查是否出错(只在疲劳时更容易出错) + if self.state['fatigue'] > 0.6: # 只有疲劳时才容易出错 + current_error_rate = self._get_current_error_rate() + if random.random() < current_error_rate: + await self._simulate_error(char) + continue + + # 正常字符输入 + if char in ',。!?、;:': + # 标点符号输入 + delay = random.randint(*self.config['punct_delay']) + # 疲劳会增加延迟 + delay = int(delay * (1 + self.state['fatigue'] * 0.5)) + await self.page.keyboard.type(char, delay=delay) + # 标点符号后一定停顿 + await asyncio.sleep(random.uniform(*self.config['word_pause'])) + else: + # 普通字符输入 + delay = random.randint(*self.config['char_delay']) + # 疲劳和注意力影响输入速度 + delay = int(delay * (1 + self.state['fatigue'] * 0.5 - self.state['attention'] * 0.2)) + await self.page.keyboard.type(char, delay=delay) + + # 更新状态 + self._update_state() + + # 词语间停顿 + pause_time = random.uniform(*self.config['word_pause']) + # 疲劳会增加停顿时间 + pause_time *= (1 + self.state['fatigue'] * 0.3) + await asyncio.sleep(pause_time) - elif error_type == 'double_hit': - await self._type_char(char, self._calculate_typing_speed(state)) - await self._type_char(char, self._calculate_typing_speed(state)) - await asyncio.sleep(random.uniform(0.1, 0.3)) - await self._press_key("Backspace") - - else: # delay - await asyncio.sleep(random.uniform(0.3, 0.8)) - await self._type_char(char, self._calculate_typing_speed(state)) - - async def _natural_pause(self, state: Dict): - """自然停顿""" - base_pause = random.uniform(0.5, 1.5) - - if state['energy_level'] < 0.5: - base_pause *= 1.3 - if state['emotion_state'] < 0.6: - base_pause *= 1.2 - - await asyncio.sleep(base_pause * random.uniform(0.8, 1.2)) - - async def _micro_pause(self, state: Dict): - """字符间的微小停顿""" - pause_time = random.uniform(0.05, 0.15) - if state['energy_level'] < 0.5: - pause_time *= 1.2 - await asyncio.sleep(pause_time) - - def _get_similar_char(self, char: str) -> str: - """获取相似字符""" - similar_chars = { - '的': '地得', - '了': '着啦', - '和': '与跟', - '我': '我我', - '是': '市师', - '在': '再在', - '有': '又有', - '都': '都读', - '好': '号毫' - } - return random.choice(similar_chars.get(char, char + char)) + # 语义块之间的停顿 + pause_time = random.uniform(*self.config['chunk_pause']) + # 疲劳和注意力影响停顿时间 + pause_time *= (1 + self.state['fatigue'] * 0.5 - self.state['attention'] * 0.2) + await asyncio.sleep(pause_time) async def _prepare_input(self, selector: str): """准备输入""" try: - await self.page.wait_for_selector(selector, timeout=5000) - await self.page.click(selector) + element = await self.page.wait_for_selector(selector, timeout=5000) + await element.click() await asyncio.sleep(random.uniform(0.3, 0.8)) except Exception as e: print(f"准备输入失败: {e}") raise - async def _type_char(self, char: str, speed: float): - """输入单个字符""" - try: - delay = 1000 / speed # 转换为毫秒 - await self.page.keyboard.type(char, delay=delay) - except Exception as e: - print(f"输入字符失败: {e}") - raise +class OptimizedXHSTyping: + """优化的小红书输入模拟器""" + def __init__(self, page): + self.page = page + self.typing_config = { + 'char_delay': (100, 200), # 基础字符延迟 + 'punct_delay': (200, 300), # 标点符号延迟 + 'paragraph_pause': (0.5, 1.0), # 段落停顿 + 'natural_pause': 0.05 # 自然停顿概率 + } - async def _press_key(self, key: str): - """按键操作""" - try: - await self.page.keyboard.press(key) - except Exception as e: - print(f"按键操作失败: {e}") - raise + async def type_text(self, text: str): + paragraphs = text.split('\n\n') + + for i, para in enumerate(paragraphs): + # 段落输入 + await self._type_paragraph(para) + + # 段落间自然停顿 + if i < len(paragraphs) - 1: + await asyncio.sleep(random.uniform(*self.typing_config['paragraph_pause'])) + + async def _type_paragraph(self, paragraph: str): + char_count = 0 + + for char in paragraph: + # 随机自然停顿 + if random.random() < self.typing_config['natural_pause']: + await asyncio.sleep(random.uniform(0.2, 0.5)) + + # 字符输入 + if char in ',。!?、;:': + delay = random.randint(*self.typing_config['punct_delay']) + else: + delay = random.randint(*self.typing_config['char_delay']) + + await self.page.keyboard.type(char, delay=delay) + char_count += 1 + + # 每20-30个字符后可能停顿 + if char_count % random.randint(20, 30) == 0: + await asyncio.sleep(random.uniform(0.1, 0.3)) + +class XHSEnhancedTyping(EnhancedHumanTypingSimulator): + """小红书专用增强版输入模拟器""" + def __init__(self, page=None): + super().__init__(page) + self.tag_mode = False + + async def type_text(self, text: str, selector: str = None) -> bool: + """重写文本输入方法""" + if self.tag_mode: + # 标签模式下使用较慢的输入速度 + self.base_config.update({ + 'min_typing_speed': 5, + 'max_typing_speed': 12 + }) + else: + # 正常文本模式 + self.base_config.update({ + 'min_typing_speed': 8, + 'max_typing_speed': 20 + }) + + return await super().type_text(text, selector) + + async def handle_tag_input(self, tag: str): + """标签输入处理""" + self.tag_mode = True + + # 输入#号 + await self.page.keyboard.press("Shift") + await asyncio.sleep(random.uniform(0.1, 0.2)) + await self.page.keyboard.press("3") + await self.page.keyboard.up("Shift") + + # 输入标签文本 + await self.type_text(tag) + + # 等待建议出现 + await asyncio.sleep(random.uniform(0.3, 0.5)) + + # 70%概率选择建议 + if random.random() < 0.7: + try: + suggestions = await self.page.query_selector_all('.suggestion-item') + if suggestions: + await random.choice(suggestions[:2]).click() + await asyncio.sleep(random.uniform(0.2, 0.4)) + self.tag_mode = False + return + except: + pass + + # 如果没有选择建议,直接回车 + await self.page.keyboard.press("Enter") + await asyncio.sleep(random.uniform(0.2, 0.4)) + self.tag_mode = False