# -*- coding: utf-8 -*- """ 小红书笔记上传器 - 主实现 完全模仿视频上传器的实现,强化反爬虫能力 支持图文笔记和视频笔记 """ import os import time import random import asyncio from datetime import datetime from pathlib import Path from typing import List, Optional from playwright.async_api import Playwright, async_playwright, Page from conf import LOCAL_CHROME_PATH, BASE_DIR from utils.base_social_media import set_init_script from utils.anti_detection import create_stealth_browser, create_stealth_context from utils.human_typing_wrapper import create_human_typer, HumanTypingWrapper from utils.log import xiaohongshu_logger as logger # ============================================================================ # Cookie管理 # ============================================================================ async def cookie_auth(account_file: str) -> bool: """ 验证Cookie是否有效(完整反检测版本) Args: account_file: Cookie文件路径 Returns: bool: Cookie是否有效 """ try: async with async_playwright() as playwright: # ✅ 使用反检测浏览器 browser = await create_stealth_browser( playwright, headless=True, custom_args=['--disable-blink-features=AutomationControlled'] ) # ✅ 使用反检测上下文 context = await create_stealth_context( browser, account_file=account_file, headless=True, custom_options={ 'viewport': {'width': 1920, 'height': 1080}, 'locale': 'zh-CN', 'timezone_id': 'Asia/Shanghai', } ) # ✅ 注入stealth脚本 context = await set_init_script(context) page = await context.new_page() # 访问创作者中心 await page.goto("https://creator.xiaohongshu.com/publish/publish") try: await page.wait_for_url( "https://creator.xiaohongshu.com/publish/publish**", timeout=5000 ) except: logger.warning("[+] Cookie可能失效") await context.close() await browser.close() return False # 检查是否有登录提示 if await page.get_by_text('手机号登录').count() or await page.get_by_text('扫码登录').count(): logger.warning("[+] 检测到登录页面,Cookie失效") await context.close() await browser.close() return False logger.info("[+] Cookie有效") await context.close() await browser.close() return True except Exception as e: logger.error(f"Cookie验证失败: {e}") return False async def xiaohongshu_note_setup(account_file: str, handle: bool = False) -> bool: """ 设置小红书笔记上传器(检查或生成Cookie) Args: account_file: Cookie文件路径 handle: 是否处理Cookie失效(重新登录) Returns: bool: 设置是否成功 """ if not os.path.exists(account_file) or not await cookie_auth(account_file): if not handle: logger.warning('[!] Cookie文件不存在或已失效') return False logger.info('[+] Cookie文件不存在或已失效,即将自动打开浏览器,请扫码登录') await xiaohongshu_note_cookie_gen(account_file) return True async def xiaohongshu_note_cookie_gen(account_file: str): """ 生成Cookie(完整反检测版本) Args: account_file: Cookie保存路径 """ async with async_playwright() as playwright: # ✅ 使用反检测浏览器 browser = await create_stealth_browser( playwright, headless=False, # 生成Cookie必须使用有头模式 custom_args=[ '--disable-blink-features=AutomationControlled', '--lang=zh-CN', ] ) # ✅ 创建反检测上下文(无Cookie) context_options = { 'viewport': {'width': 1920, 'height': 1080}, 'locale': 'zh-CN', 'timezone_id': 'Asia/Shanghai', 'device_scale_factor': 1, 'has_touch': False, 'is_mobile': False, } # 有头模式下也设置真实User-Agent user_agent = random.choice([ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', ]) context_options['user_agent'] = user_agent context = await browser.new_context(**context_options) # ✅ 注入stealth脚本 context = await set_init_script(context) page = await context.new_page() await page.goto("https://creator.xiaohongshu.com/") # 暂停等待用户登录 await page.pause() # 保存Cookie await context.storage_state(path=account_file) logger.success(f'[+] Cookie已保存到: {account_file}') await context.close() await browser.close() # ============================================================================ # 基础笔记类 # ============================================================================ class XiaoHongShuNote: """小红书笔记上传器基类""" def __init__( self, title: str, content: str, tags: List[str], note_type: str, publish_date, account_file: str, location: Optional[str] = None, headless: bool = False ): """ 初始化笔记上传器 Args: title: 笔记标题 content: 笔记正文内容 tags: 话题标签列表 note_type: 笔记类型 ('image' 或 'video') publish_date: 发布时间,0表示立即发布 account_file: Cookie文件路径 location: 地点(可选) headless: 是否使用无头模式(不推荐) """ self.title = title[:30] # 限制30字符 self.content = content[:1000] # 限制1000字符 self.tags = tags[:3] # 限制3个标签 self.note_type = note_type self.publish_date = publish_date self.account_file = account_file self.location = location self.headless = headless self.local_executable_path = LOCAL_CHROME_PATH logger.info(f"初始化笔记上传器: {note_type}, 标题={title[:20]}...") async def create_note_browser(self, playwright: Playwright): """创建具有强反检测能力的浏览器""" logger.info("创建反检测浏览器...") # 自定义浏览器参数 custom_args = [ '--disable-blink-features=AutomationControlled', '--disable-dev-shm-usage', '--no-sandbox', '--disable-setuid-sandbox', '--disable-web-security', '--disable-features=IsolateOrigins,site-per-process', '--lang=zh-CN', '--window-size=1920,1080', ] # 创建隐蔽浏览器 browser = await create_stealth_browser( playwright, headless=self.headless, executable_path=self.local_executable_path, custom_args=custom_args ) # 创建隐蔽上下文 context = await create_stealth_context( browser, account_file=self.account_file, headless=self.headless, custom_options={ 'viewport': {'width': 1920, 'height': 1080}, 'locale': 'zh-CN', 'timezone_id': 'Asia/Shanghai', 'device_scale_factor': 1, 'has_touch': False, 'is_mobile': False, } ) return browser, context async def random_pause(self, min_sec: float = 1.0, max_sec: float = 3.0): """随机停顿""" wait_time = random.uniform(min_sec, max_sec) await asyncio.sleep(wait_time) async def simulate_human_behavior(self, page: Page): """模拟真实用户行为""" logger.info("模拟人类浏览行为...") # 随机移动鼠标 for _ in range(random.randint(2, 4)): await page.mouse.move( random.randint(100, 800), random.randint(100, 600) ) await asyncio.sleep(random.uniform(0.3, 1.0)) # 随机滚动 await page.mouse.wheel(0, random.randint(-100, 100)) await asyncio.sleep(random.uniform(0.5, 1.5)) async def navigate_to_publish_page(self, page: Page): """访问发布页面""" logger.info("访问笔记发布页面...") # 根据笔记类型选择URL if self.note_type == 'image': url = "https://creator.xiaohongshu.com/publish/publish?from=menu&target=image" else: url = "https://creator.xiaohongshu.com/publish/publish?from=menu&target=video" await page.goto(url) await page.wait_for_load_state('domcontentloaded') logger.success(f"✅ 已打开发布页面: {url}") async def fill_title(self, page: Page): """填充标题(人类化输入)""" logger.info("填充标题...") # 根据你提供的HTML结构定位标题输入框 # title_selectors = [ 'input.d-text[type="text"][placeholder="填写标题会有更多赞哦~"]', 'input.d-text[placeholder*="标题"]', 'div.plugin.title-container input.d-text', '.notranslate', ] title_input = None for selector in title_selectors: try: title_input = await page.wait_for_selector(selector, timeout=3000) if title_input: logger.info(f"找到标题输入框: {selector}") break except: continue if not title_input: raise Exception("未找到标题输入框") # 先点击输入框获得焦点 await title_input.click() await self.random_pause(0.3, 0.8) # 创建人类化输入器 normal_typer = create_human_typer(page, { 'min_delay': 80, 'max_delay': 150, 'pause_probability': 0.15, 'pause_min': 300, 'pause_max': 800, }) # 输入标题 success = await normal_typer.type_text_human( title_selectors[0], self.title, clear_first=True ) if not success: logger.warning("人类化输入失败,使用传统方式") await title_input.click() await page.keyboard.press("Control+A") await page.keyboard.press("Delete") await page.keyboard.type(self.title, delay=100) logger.success(f"✅ 标题填充完成: {self.title}") await self.random_pause(0.5, 1.5) async def fill_content_text(self, page: Page): """填充正文内容(直接粘贴,参考视频上传器)""" logger.info("填充正文...") # 根据你提供的HTML结构定位TipTap编辑器 #
content_selectors = [ 'div.tiptap.ProseMirror[contenteditable="true"]', 'div[contenteditable="true"][role="textbox"].tiptap', 'div.editor-container div.tiptap[contenteditable="true"]', '#publish-container .editor-content > div > div', ] content_input = None for selector in content_selectors: try: content_input = await page.wait_for_selector(selector, timeout=3000) if content_input: logger.info(f"找到正文输入框: {selector}") break except: continue if not content_input: logger.warning("未找到正文输入框,跳过") return # 点击输入框获得焦点 await content_input.click() await self.random_pause(0.3, 0.8) # 直接粘贴内容(不使用人类化输入,速度更快) # 使用 Ctrl+V 模拟粘贴操作 try: # 方式1:使用 page.evaluate 直接设置内容(最快) await page.evaluate(f''' (selector) => {{ const element = document.querySelector(selector); if (element) {{ element.textContent = `{self.content.replace('`', '\\`')}`; element.dispatchEvent(new Event('input', {{ bubbles: true }})); }} }} ''', content_selectors[0]) logger.success(f"✅ 正文填充完成(直接粘贴,{len(self.content)} 字符)") except Exception as e: logger.warning(f"直接粘贴失败: {e},使用键盘输入") # 备用方案:使用键盘快速输入(不使用人类化延迟) await page.keyboard.type(self.content, delay=0) logger.success(f"✅ 正文填充完成(键盘输入,{len(self.content)} 字符)") await self.random_pause(0.5, 1.0) async def add_tags(self, page: Page): """添加话题标签(极慢速模式)""" logger.info("添加话题标签...") # 标签应该添加到正文编辑器中 # 使用与正文相同的TipTap编辑器 tag_selectors = [ 'div.tiptap.ProseMirror[contenteditable="true"]', 'div[contenteditable="true"][role="textbox"].tiptap', '#publish-container .editor-content > div > div', ] tag_selector = tag_selectors[0] # 创建极慢速输入器(标签输入需要更慢) slow_typer = HumanTypingWrapper(page, { 'min_delay': 500, 'max_delay': 800, 'pause_probability': 0.3, 'pause_min': 500, 'pause_max': 1200, 'correction_probability': 0.0, 'backspace_probability': 0.0, }) # 如果正文不为空,先添加换行 if self.content: await page.keyboard.press("Enter") await self.random_pause(0.5, 1.0) # 逐个输入标签 for i, tag in enumerate(self.tags): logger.info(f"输入标签 {i+1}/{len(self.tags)}: {tag}") # 输入 # 符号和标签文本 tag_text = f"#{tag}" success = await slow_typer.type_text_human( tag_selector, tag_text, clear_first=False ) if not success: logger.warning(f"标签 {tag} 人类化输入失败,使用传统方式") await page.keyboard.type("#") for char in tag: await page.keyboard.type(char, delay=600) await asyncio.sleep(0.1) # 按回车或空格(标签之间用换行分隔) await page.keyboard.press("Enter") # 标签间停顿800ms await page.wait_for_timeout(800) logger.info(f"✅ 标签 {tag} 添加完成") logger.success(f"✅ 所有标签添加完成 (共 {len(self.tags)} 个)") await self.random_pause(1, 2) async def set_location(self, page: Page, location: str): """设置发布地点""" logger.info(f"设置地点: {location}") try: # 点击地点输入框 loc_selectors = [ 'div.d-text.d-select-placeholder', 'input[placeholder*="地点"]', 'div:has-text("添加地点")', ] loc_element = None for selector in loc_selectors: try: loc_element = await page.wait_for_selector(selector, timeout=3000) if loc_element: break except: continue if not loc_element: logger.warning("未找到地点输入框,跳过") return False await loc_element.click() await self.random_pause(0.5, 1.0) # 输入地点名称 await page.keyboard.type(location, delay=200) logger.info(f"已输入地点名称: {location}") # 等待下拉列表加载 await asyncio.sleep(3) # 使用灵活的选择器 flexible_xpath = ( f'//div[contains(@class, "d-popover") and contains(@class, "d-dropdown")]' f'//div[contains(@class, "d-options-wrapper")]' f'//div[contains(@class, "name") and text()="{location}"]' ) try: location_option = await page.wait_for_selector( flexible_xpath, timeout=5000 ) if location_option: await location_option.scroll_into_view_if_needed() await asyncio.sleep(0.5) await location_option.click() logger.success(f"✅ 地点设置成功: {location}") await self.random_pause(0.5, 1.0) return True except Exception: logger.warning(f"未找到匹配的地点,尝试选择第一个选项") # 尝试选择第一个选项 first_option_xpath = ( f'//div[contains(@class, "d-popover") and contains(@class, "d-dropdown")]' f'//div[contains(@class, "d-options-wrapper")]' f'//div[contains(@class, "d-option-item")][1]' ) first_option = await page.query_selector(first_option_xpath) if first_option: await first_option.click() logger.info("已选择第一个推荐地点") return True logger.warning("地点设置失败,跳过") return False except Exception as e: logger.error(f"设置地点失败: {e}") return False async def set_schedule_time(self, page: Page, publish_date: datetime): """设置定时发布时间(参考视频上传器的实现)""" logger.info(f"设置定时发布: {publish_date}") try: # 点击"定时发布"标签(使用locator方式) logger.info(" [-] 正在设置定时发布时间...") # 使用Playwright的locator定位"定时发布"标签 schedule_label = page.locator("label:has-text('定时发布')") await schedule_label.click() await asyncio.sleep(1) # 格式化发布时间(格式:YYYY-MM-DD HH:MM) publish_date_hour = publish_date.strftime("%Y-%m-%d %H:%M") logger.info(f"发布时间: {publish_date_hour}") await asyncio.sleep(1) # 点击时间输入框并输入时间 await page.locator('.el-input__inner[placeholder="选择日期和时间"]').click() await page.keyboard.press("Control+KeyA") await page.keyboard.type(str(publish_date_hour)) await page.keyboard.press("Enter") await asyncio.sleep(1) logger.success(f"✅ 定时发布时间设置完成: {publish_date_hour}") except Exception as e: logger.error(f"设置定时发布失败: {e}") raise async def pre_publish_check(self, page: Page): """发布前检查""" logger.info("执行发布前检查...") checks = [] # 检查标题 try: title_filled = await page.query_selector('input[class*="title"]') title_value = await title_filled.input_value() if title_filled else "" checks.append(("标题", len(title_value) > 0)) except: checks.append(("标题", False)) # 检查标签 try: tags = await page.query_selector_all('span[class*="topic"], span[class*="tag"]') checks.append(("标签", len(tags) > 0)) except: checks.append(("标签", False)) # 打印检查结果 for name, passed in checks: status = "✅" if passed else "❌" logger.info(f"{status} {name}: {'已填充' if passed else '未填充'}") # 如果有未通过的检查 failed_checks = [name for name, passed in checks if not passed] if failed_checks: logger.warning(f"以下项目未完成: {', '.join(failed_checks)}") async def publish(self, page: Page): """点击发布按钮""" logger.info("准备发布...") try: # 等待发布按钮可点击 publish_button_text = "定时发布" if self.publish_date != 0 else "发布" publish_button = await page.wait_for_selector( f'button:has-text("{publish_button_text}")', timeout=10000 ) # 模拟犹豫 await self.random_pause(1, 3) # 点击发布 await publish_button.click() logger.info(f"已点击'{publish_button_text}'按钮") except Exception as e: logger.error(f"点击发布按钮失败: {e}") raise async def wait_publish_success(self, page: Page): """等待发布成功""" logger.info("等待发布完成...") max_wait = 30 # 最多等待30秒 try: # 等待跳转到成功页面 await page.wait_for_url( "https://creator.xiaohongshu.com/publish/success?**", timeout=max_wait * 1000 ) logger.success("✅ 笔记发布成功!") return True except Exception as e: logger.error(f"发布失败或超时: {e}") # 截图保存 screenshot_path = f"publish_failed_{int(time.time())}.png" await page.screenshot(path=screenshot_path, full_page=True) logger.info(f"错误截图已保存: {screenshot_path}") raise async def main(self): """主上传流程 - 子类必须实现""" raise NotImplementedError("子类必须实现main方法") # ============================================================================ # 图文笔记类 # ============================================================================ class XiaoHongShuImageNote(XiaoHongShuNote): """小红书图文笔记上传器""" def __init__( self, title: str, content: str, tags: List[str], image_paths: List[str], publish_date, account_file: str, cover_index: int = 0, filter_name: Optional[str] = None, location: Optional[str] = None, headless: bool = False ): """ 初始化图文笔记上传器 Args: image_paths: 图片路径列表(1-9张) cover_index: 封面索引(0-8) filter_name: 滤镜名称(可选) 其他参数见基类 """ super().__init__(title, content, tags, 'image', publish_date, account_file, location, headless) # 验证图片数量 if not image_paths or len(image_paths) > 9: raise ValueError("图片数量必须在1-9张之间") self.image_paths = image_paths self.cover_index = cover_index self.filter_name = filter_name logger.info(f"初始化图文笔记上传器: {len(image_paths)} 张图片") async def upload_images(self, page: Page): """上传图片""" logger.info(f"准备上传 {len(self.image_paths)} 张图片") # 根据你提供的HTML结构定位上传输入框 # upload_selectors = [ "input.upload-input[type='file'][accept*='.jpg']", "input.upload-input[accept*='.png']", "input[type='file'][multiple][accept*='.jpg,.jpeg,.png']", "div.upload-wrapper input.upload-input", ] upload_input = None for selector in upload_selectors: try: upload_input = await page.wait_for_selector(selector, timeout=5000) if upload_input: logger.info(f"找到上传元素: {selector}") break except: continue if not upload_input: raise Exception("未找到图片上传元素") # 批量上传所有图片(因为input支持multiple) logger.info(f"批量上传 {len(self.image_paths)} 张图片...") # 验证所有文件存在 for image_path in self.image_paths: if not os.path.exists(image_path): raise FileNotFoundError(f"图片文件不存在: {image_path}") # 一次性上传所有图片 await upload_input.set_input_files(self.image_paths) # 等待所有图片预览加载完成 await self.wait_all_images_preview(page, len(self.image_paths)) logger.success(f"✅ 所有图片上传完成") async def wait_all_images_preview(self, page: Page, expected_count: int): """等待所有图片预览加载完成""" max_wait = 60 # 最多等待60秒 waited = 0 check_interval = 1 # 每秒检查一次 logger.info(f"等待 {expected_count} 张图片预览加载...") while waited < max_wait: try: # 查找所有图片预览元素 preview_selectors = [ 'div[class*="image-item"]', 'div[class*="photo-item"]', 'div.upload-wrapper img', 'img[src*="blob"]', ] loaded_count = 0 for selector in preview_selectors: previews = await page.query_selector_all(selector) if len(previews) >= expected_count: loaded_count = len(previews) break if loaded_count >= expected_count: logger.success(f"✅ {loaded_count} 张图片预览已加载") return True # 显示进度 if waited % 5 == 0: logger.info(f"已等待 {waited}秒,当前已加载 {loaded_count}/{expected_count} 张图片") await asyncio.sleep(check_interval) waited += check_interval except Exception as e: await asyncio.sleep(check_interval) waited += check_interval logger.warning(f"图片预览等待超时(已等待{waited}秒)") return False async def wait_image_preview(self, page: Page, image_index: int): """等待单张图片预览加载(已弃用,保留兼容性)""" max_wait = 30 # 最多等待30秒 waited = 0 while waited < max_wait: try: # 查找预览容器 preview_container = await page.query_selector_all( 'div[class*="preview"], div[class*="image-item"], div[class*="photo"]' ) if len(preview_container) > image_index: # 检查是否有加载完成标识 img_element = await preview_container[image_index].query_selector('img') if img_element: src = await img_element.get_attribute('src') if src and (src.startswith('http') or src.startswith('blob')): logger.info(f"图片 {image_index+1} 预览加载完成") return True await asyncio.sleep(0.5) waited += 0.5 except Exception as e: await asyncio.sleep(0.5) waited += 0.5 logger.warning(f"图片 {image_index+1} 预览超时") return False async def set_cover(self, page: Page): """设置封面""" if self.cover_index == 0: logger.info("使用默认封面(第一张图片)") return logger.info(f"设置封面: 第 {self.cover_index+1} 张图片") try: # 这里需要根据实际页面结构实现 # 小红书图文笔记默认使用第一张作为封面 logger.info("图文笔记默认使用第一张图片作为封面") except Exception as e: logger.error(f"设置封面失败: {e}") async def apply_filters(self, page: Page): """应用滤镜""" if not self.filter_name: logger.info("未设置滤镜,跳过") return logger.info(f"应用滤镜: {self.filter_name}") try: # 这里需要根据实际页面结构实现 logger.info("滤镜功能待实现") except Exception as e: logger.error(f"应用滤镜失败: {e}") async def main(self): """图文笔记主上传流程""" async with async_playwright() as playwright: browser = None context = None try: # 步骤1: 创建浏览器 logger.info("[1/11] 创建浏览器环境...") browser, context = await self.create_note_browser(playwright) # 步骤2: 创建页面 logger.info("[2/11] 创建页面...") page = await context.new_page() # 步骤3: 访问发布页面 logger.info("[3/11] 访问笔记发布页面...") await self.navigate_to_publish_page(page) # 步骤4: 模拟人类浏览 logger.info("[4/11] 模拟浏览行为...") await self.simulate_human_behavior(page) # 步骤5: 上传图片 logger.info("[5/11] 上传图片...") await self.upload_images(page) # 步骤6: 填充标题 logger.info("[6/11] 填充标题...") await self.fill_title(page) # 步骤7: 填充正文 logger.info("[7/11] 填充正文...") await self.fill_content_text(page) # 步骤8: 添加标签 logger.info("[8/11] 添加标签...") await self.add_tags(page) # 步骤9: 设置地点(可选) if self.location: logger.info("[9/11] 设置地点...") await self.set_location(page, self.location) else: logger.info("[9/11] 跳过地点设置") # 步骤10: 设置定时发布(可选) if self.publish_date != 0: logger.info("[10/11] 设置定时发布...") await self.set_schedule_time(page, self.publish_date) else: logger.info("[10/11] 立即发布模式") # 步骤11: 发布 logger.info("[11/11] 发布笔记...") await self.pre_publish_check(page) await self.random_pause(2, 5) # 模拟犹豫 await self.publish(page) # 等待发布成功 await self.wait_publish_success(page) # 保存Cookie await context.storage_state(path=self.account_file) logger.success("✅ Cookie已更新") logger.success("🎉 图文笔记发布成功!") except Exception as e: logger.error(f"❌ 发布失败: {e}") if context and page: await page.screenshot(path=f"error_image_note_{int(time.time())}.png", full_page=True) raise finally: if context: await context.close() if browser: await browser.close() # ============================================================================ # 视频笔记类 # ============================================================================ class XiaoHongShuVideoNote(XiaoHongShuNote): """小红书视频笔记上传器""" def __init__( self, title: str, content: str, tags: List[str], video_path: str, publish_date, account_file: str, thumbnail_path: Optional[str] = None, location: Optional[str] = None, headless: bool = False ): """ 初始化视频笔记上传器 Args: video_path: 视频文件路径 thumbnail_path: 视频封面路径(可选) 其他参数见基类 """ super().__init__(title, content, tags, 'video', publish_date, account_file, location, headless) self.video_path = video_path self.thumbnail_path = thumbnail_path # 验证文件 if not os.path.exists(video_path): raise FileNotFoundError(f"视频文件不存在: {video_path}") file_size = os.path.getsize(video_path) / (1024 * 1024) # MB logger.info(f"初始化视频笔记上传器: 视频大小 {file_size:.2f} MB") async def upload_video(self, page: Page): """上传视频""" logger.info(f"准备上传视频: {self.video_path}") file_size = os.path.getsize(self.video_path) / (1024 * 1024) # MB logger.info(f"视频大小: {file_size:.2f} MB") # 定位上传元素 upload_selectors = [ "input[type='file'][accept*='video']", "div[class^='upload-content'] input.upload-input", "input.upload-input", ] upload_input = None for selector in upload_selectors: try: upload_input = await page.wait_for_selector(selector, timeout=5000) if upload_input: logger.info(f"找到上传元素: {selector}") break except: continue if not upload_input: raise Exception("未找到视频上传元素") # 上传视频 logger.info("开始上传视频...") await upload_input.set_input_files(self.video_path) # 等待上传和转码 await self.wait_video_upload_complete(page, file_size) logger.success("✅ 视频上传完成") async def wait_video_upload_complete(self, page: Page, file_size_mb: float): """等待视频上传和转码完成""" # 估算最大等待时间 estimated_upload_time = file_size_mb * 2 # 秒 estimated_transcode_time = 60 # 秒 max_wait_time = estimated_upload_time + estimated_transcode_time + 60 logger.info(f"预计最多等待 {max_wait_time:.0f} 秒") waited = 0 check_interval = 3 # 每3秒检查一次 while waited < max_wait_time: try: # 查找"上传成功"标识 upload_input = await page.wait_for_selector('input.upload-input', timeout=3000) if upload_input: preview_new = await upload_input.query_selector( 'xpath=following-sibling::div[contains(@class, "preview")]' ) if preview_new: stage_elements = await preview_new.query_selector_all('div.stage, div[class*="status"]') for stage in stage_elements: text_content = await page.evaluate( '(element) => element.textContent', stage ) if '上传成功' in text_content or '转码完成' in text_content: logger.success("检测到上传成功标识") return True if '上传失败' in text_content or '转码失败' in text_content: raise Exception("视频上传或转码失败") # 显示等待进度 if waited % 10 == 0: logger.info(f"已等待 {waited}/{max_wait_time:.0f} 秒...") await asyncio.sleep(check_interval) waited += check_interval except Exception as e: await asyncio.sleep(check_interval) waited += check_interval raise TimeoutError(f"视频上传超时(等待了{max_wait_time:.0f}秒)") async def main(self): """视频笔记主上传流程""" async with async_playwright() as playwright: browser = None context = None try: # 步骤1: 创建浏览器 logger.info("[1/11] 创建浏览器环境...") browser, context = await self.create_note_browser(playwright) # 步骤2: 创建页面 logger.info("[2/11] 创建页面...") page = await context.new_page() # 步骤3: 访问发布页面 logger.info("[3/11] 访问笔记发布页面...") await self.navigate_to_publish_page(page) # 步骤4: 模拟人类浏览 logger.info("[4/11] 模拟浏览行为...") await self.simulate_human_behavior(page) # 步骤5: 上传视频 logger.info("[5/11] 上传视频...") await self.upload_video(page) # 步骤6: 填充标题 logger.info("[6/11] 填充标题...") await self.fill_title(page) # 步骤7: 填充正文 logger.info("[7/11] 填充正文...") await self.fill_content_text(page) # 步骤8: 添加标签 logger.info("[8/11] 添加标签...") await self.add_tags(page) # 步骤9: 设置地点(可选) if self.location: logger.info("[9/11] 设置地点...") await self.set_location(page, self.location) else: logger.info("[9/11] 跳过地点设置") # 步骤10: 设置定时发布(可选) if self.publish_date != 0: logger.info("[10/11] 设置定时发布...") await self.set_schedule_time(page, self.publish_date) else: logger.info("[10/11] 立即发布模式") # 步骤11: 发布 logger.info("[11/11] 发布笔记...") await self.pre_publish_check(page) await self.random_pause(2, 5) # 模拟犹豫 await self.publish(page) # 等待发布成功 await self.wait_publish_success(page) # 保存Cookie await context.storage_state(path=self.account_file) logger.success("✅ Cookie已更新") logger.success("🎉 视频笔记发布成功!") except Exception as e: logger.error(f"❌ 发布失败: {e}") if context and page: await page.screenshot(path=f"error_video_note_{int(time.time())}.png", full_page=True) raise finally: if context: await context.close() if browser: await browser.close()