1154 lines
42 KiB
Python
Raw Permalink Normal View History

# -*- coding: utf-8 -*-
"""
小红书笔记上传器 - 主实现
完全模仿视频上传器的实现强化反爬虫能力
支持图文笔记和视频笔记
"""
import os
import time
import random
import asyncio
from datetime import datetime
from pathlib import Path
from typing import List, Optional
from playwright.async_api import Playwright, async_playwright, Page
from conf import LOCAL_CHROME_PATH, BASE_DIR
from utils.base_social_media import set_init_script
from utils.anti_detection import create_stealth_browser, create_stealth_context
from utils.human_typing_wrapper import create_human_typer, HumanTypingWrapper
from utils.log import xiaohongshu_logger as logger
# ============================================================================
# Cookie管理
# ============================================================================
async def cookie_auth(account_file: str) -> bool:
"""
验证Cookie是否有效完整反检测版本
Args:
account_file: Cookie文件路径
Returns:
bool: Cookie是否有效
"""
try:
async with async_playwright() as playwright:
# ✅ 使用反检测浏览器
browser = await create_stealth_browser(
playwright,
headless=True,
custom_args=['--disable-blink-features=AutomationControlled']
)
# ✅ 使用反检测上下文
context = await create_stealth_context(
browser,
account_file=account_file,
headless=True,
custom_options={
'viewport': {'width': 1920, 'height': 1080},
'locale': 'zh-CN',
'timezone_id': 'Asia/Shanghai',
}
)
# ✅ 注入stealth脚本
context = await set_init_script(context)
page = await context.new_page()
# 访问创作者中心
await page.goto("https://creator.xiaohongshu.com/publish/publish")
try:
await page.wait_for_url(
"https://creator.xiaohongshu.com/publish/publish**",
timeout=5000
)
except:
logger.warning("[+] Cookie可能失效")
await context.close()
await browser.close()
return False
# 检查是否有登录提示
if await page.get_by_text('手机号登录').count() or await page.get_by_text('扫码登录').count():
logger.warning("[+] 检测到登录页面Cookie失效")
await context.close()
await browser.close()
return False
logger.info("[+] Cookie有效")
await context.close()
await browser.close()
return True
except Exception as e:
logger.error(f"Cookie验证失败: {e}")
return False
async def xiaohongshu_note_setup(account_file: str, handle: bool = False) -> bool:
"""
设置小红书笔记上传器检查或生成Cookie
Args:
account_file: Cookie文件路径
handle: 是否处理Cookie失效重新登录
Returns:
bool: 设置是否成功
"""
if not os.path.exists(account_file) or not await cookie_auth(account_file):
if not handle:
logger.warning('[!] Cookie文件不存在或已失效')
return False
logger.info('[+] Cookie文件不存在或已失效即将自动打开浏览器请扫码登录')
await xiaohongshu_note_cookie_gen(account_file)
return True
async def xiaohongshu_note_cookie_gen(account_file: str):
"""
生成Cookie完整反检测版本
Args:
account_file: Cookie保存路径
"""
async with async_playwright() as playwright:
# ✅ 使用反检测浏览器
browser = await create_stealth_browser(
playwright,
headless=False, # 生成Cookie必须使用有头模式
custom_args=[
'--disable-blink-features=AutomationControlled',
'--lang=zh-CN',
]
)
# ✅ 创建反检测上下文无Cookie
context_options = {
'viewport': {'width': 1920, 'height': 1080},
'locale': 'zh-CN',
'timezone_id': 'Asia/Shanghai',
'device_scale_factor': 1,
'has_touch': False,
'is_mobile': False,
}
# 有头模式下也设置真实User-Agent
user_agent = random.choice([
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
])
context_options['user_agent'] = user_agent
context = await browser.new_context(**context_options)
# ✅ 注入stealth脚本
context = await set_init_script(context)
page = await context.new_page()
await page.goto("https://creator.xiaohongshu.com/")
# 暂停等待用户登录
await page.pause()
# 保存Cookie
await context.storage_state(path=account_file)
logger.success(f'[+] Cookie已保存到: {account_file}')
await context.close()
await browser.close()
# ============================================================================
# 基础笔记类
# ============================================================================
class XiaoHongShuNote:
"""小红书笔记上传器基类"""
def __init__(
self,
title: str,
content: str,
tags: List[str],
note_type: str,
publish_date,
account_file: str,
location: Optional[str] = None,
headless: bool = False
):
"""
初始化笔记上传器
Args:
title: 笔记标题
content: 笔记正文内容
tags: 话题标签列表
note_type: 笔记类型 ('image' 'video')
publish_date: 发布时间0表示立即发布
account_file: Cookie文件路径
location: 地点可选
headless: 是否使用无头模式不推荐
"""
self.title = title[:30] # 限制30字符
self.content = content[:1000] # 限制1000字符
self.tags = tags[:3] # 限制3个标签
self.note_type = note_type
self.publish_date = publish_date
self.account_file = account_file
self.location = location
self.headless = headless
self.local_executable_path = LOCAL_CHROME_PATH
logger.info(f"初始化笔记上传器: {note_type}, 标题={title[:20]}...")
async def create_note_browser(self, playwright: Playwright):
"""创建具有强反检测能力的浏览器"""
logger.info("创建反检测浏览器...")
# 自定义浏览器参数
custom_args = [
'--disable-blink-features=AutomationControlled',
'--disable-dev-shm-usage',
'--no-sandbox',
'--disable-setuid-sandbox',
'--disable-web-security',
'--disable-features=IsolateOrigins,site-per-process',
'--lang=zh-CN',
'--window-size=1920,1080',
]
# 创建隐蔽浏览器
browser = await create_stealth_browser(
playwright,
headless=self.headless,
executable_path=self.local_executable_path,
custom_args=custom_args
)
# 创建隐蔽上下文
context = await create_stealth_context(
browser,
account_file=self.account_file,
headless=self.headless,
custom_options={
'viewport': {'width': 1920, 'height': 1080},
'locale': 'zh-CN',
'timezone_id': 'Asia/Shanghai',
'device_scale_factor': 1,
'has_touch': False,
'is_mobile': False,
}
)
return browser, context
async def random_pause(self, min_sec: float = 1.0, max_sec: float = 3.0):
"""随机停顿"""
wait_time = random.uniform(min_sec, max_sec)
await asyncio.sleep(wait_time)
async def simulate_human_behavior(self, page: Page):
"""模拟真实用户行为"""
logger.info("模拟人类浏览行为...")
# 随机移动鼠标
for _ in range(random.randint(2, 4)):
await page.mouse.move(
random.randint(100, 800),
random.randint(100, 600)
)
await asyncio.sleep(random.uniform(0.3, 1.0))
# 随机滚动
await page.mouse.wheel(0, random.randint(-100, 100))
await asyncio.sleep(random.uniform(0.5, 1.5))
async def navigate_to_publish_page(self, page: Page):
"""访问发布页面"""
logger.info("访问笔记发布页面...")
# 根据笔记类型选择URL
if self.note_type == 'image':
url = "https://creator.xiaohongshu.com/publish/publish?from=menu&target=image"
else:
url = "https://creator.xiaohongshu.com/publish/publish?from=menu&target=video"
await page.goto(url)
await page.wait_for_load_state('domcontentloaded')
logger.success(f"✅ 已打开发布页面: {url}")
async def fill_title(self, page: Page):
"""填充标题(人类化输入)"""
logger.info("填充标题...")
# 根据你提供的HTML结构定位标题输入框
# <input class="d-text" type="text" placeholder="填写标题会有更多赞哦~" value="">
title_selectors = [
'input.d-text[type="text"][placeholder="填写标题会有更多赞哦~"]',
'input.d-text[placeholder*="标题"]',
'div.plugin.title-container input.d-text',
'.notranslate',
]
title_input = None
for selector in title_selectors:
try:
title_input = await page.wait_for_selector(selector, timeout=3000)
if title_input:
logger.info(f"找到标题输入框: {selector}")
break
except:
continue
if not title_input:
raise Exception("未找到标题输入框")
# 先点击输入框获得焦点
await title_input.click()
await self.random_pause(0.3, 0.8)
# 创建人类化输入器
normal_typer = create_human_typer(page, {
'min_delay': 80,
'max_delay': 150,
'pause_probability': 0.15,
'pause_min': 300,
'pause_max': 800,
})
# 输入标题
success = await normal_typer.type_text_human(
title_selectors[0],
self.title,
clear_first=True
)
if not success:
logger.warning("人类化输入失败,使用传统方式")
await title_input.click()
await page.keyboard.press("Control+A")
await page.keyboard.press("Delete")
await page.keyboard.type(self.title, delay=100)
logger.success(f"✅ 标题填充完成: {self.title}")
await self.random_pause(0.5, 1.5)
async def fill_content_text(self, page: Page):
"""填充正文内容(直接粘贴,参考视频上传器)"""
logger.info("填充正文...")
# 根据你提供的HTML结构定位TipTap编辑器
# <div contenteditable="true" role="textbox" translate="no" class="tiptap ProseMirror" tabindex="0">
content_selectors = [
'div.tiptap.ProseMirror[contenteditable="true"]',
'div[contenteditable="true"][role="textbox"].tiptap',
'div.editor-container div.tiptap[contenteditable="true"]',
'#publish-container .editor-content > div > div',
]
content_input = None
for selector in content_selectors:
try:
content_input = await page.wait_for_selector(selector, timeout=3000)
if content_input:
logger.info(f"找到正文输入框: {selector}")
break
except:
continue
if not content_input:
logger.warning("未找到正文输入框,跳过")
return
# 点击输入框获得焦点
await content_input.click()
await self.random_pause(0.3, 0.8)
# 直接粘贴内容(不使用人类化输入,速度更快)
# 使用 Ctrl+V 模拟粘贴操作
try:
# 方式1使用 page.evaluate 直接设置内容(最快)
await page.evaluate(f'''
(selector) => {{
const element = document.querySelector(selector);
if (element) {{
element.textContent = `{self.content.replace('`', '\\`')}`;
element.dispatchEvent(new Event('input', {{ bubbles: true }}));
}}
}}
''', content_selectors[0])
logger.success(f"✅ 正文填充完成(直接粘贴,{len(self.content)} 字符)")
except Exception as e:
logger.warning(f"直接粘贴失败: {e},使用键盘输入")
# 备用方案:使用键盘快速输入(不使用人类化延迟)
await page.keyboard.type(self.content, delay=0)
logger.success(f"✅ 正文填充完成(键盘输入,{len(self.content)} 字符)")
await self.random_pause(0.5, 1.0)
async def add_tags(self, page: Page):
"""添加话题标签(极慢速模式)"""
logger.info("添加话题标签...")
# 标签应该添加到正文编辑器中
# 使用与正文相同的TipTap编辑器
tag_selectors = [
'div.tiptap.ProseMirror[contenteditable="true"]',
'div[contenteditable="true"][role="textbox"].tiptap',
'#publish-container .editor-content > div > div',
]
tag_selector = tag_selectors[0]
# 创建极慢速输入器(标签输入需要更慢)
slow_typer = HumanTypingWrapper(page, {
'min_delay': 500,
'max_delay': 800,
'pause_probability': 0.3,
'pause_min': 500,
'pause_max': 1200,
'correction_probability': 0.0,
'backspace_probability': 0.0,
})
# 如果正文不为空,先添加换行
if self.content:
await page.keyboard.press("Enter")
await self.random_pause(0.5, 1.0)
# 逐个输入标签
for i, tag in enumerate(self.tags):
logger.info(f"输入标签 {i+1}/{len(self.tags)}: {tag}")
# 输入 # 符号和标签文本
tag_text = f"#{tag}"
success = await slow_typer.type_text_human(
tag_selector,
tag_text,
clear_first=False
)
if not success:
logger.warning(f"标签 {tag} 人类化输入失败,使用传统方式")
await page.keyboard.type("#")
for char in tag:
await page.keyboard.type(char, delay=600)
await asyncio.sleep(0.1)
# 按回车或空格(标签之间用换行分隔)
await page.keyboard.press("Enter")
# 标签间停顿800ms
await page.wait_for_timeout(800)
logger.info(f"✅ 标签 {tag} 添加完成")
logger.success(f"✅ 所有标签添加完成 (共 {len(self.tags)} 个)")
await self.random_pause(1, 2)
async def set_location(self, page: Page, location: str):
"""设置发布地点"""
logger.info(f"设置地点: {location}")
try:
# 点击地点输入框
loc_selectors = [
'div.d-text.d-select-placeholder',
'input[placeholder*="地点"]',
'div:has-text("添加地点")',
]
loc_element = None
for selector in loc_selectors:
try:
loc_element = await page.wait_for_selector(selector, timeout=3000)
if loc_element:
break
except:
continue
if not loc_element:
logger.warning("未找到地点输入框,跳过")
return False
await loc_element.click()
await self.random_pause(0.5, 1.0)
# 输入地点名称
await page.keyboard.type(location, delay=200)
logger.info(f"已输入地点名称: {location}")
# 等待下拉列表加载
await asyncio.sleep(3)
# 使用灵活的选择器
flexible_xpath = (
f'//div[contains(@class, "d-popover") and contains(@class, "d-dropdown")]'
f'//div[contains(@class, "d-options-wrapper")]'
f'//div[contains(@class, "name") and text()="{location}"]'
)
try:
location_option = await page.wait_for_selector(
flexible_xpath,
timeout=5000
)
if location_option:
await location_option.scroll_into_view_if_needed()
await asyncio.sleep(0.5)
await location_option.click()
logger.success(f"✅ 地点设置成功: {location}")
await self.random_pause(0.5, 1.0)
return True
except Exception:
logger.warning(f"未找到匹配的地点,尝试选择第一个选项")
# 尝试选择第一个选项
first_option_xpath = (
f'//div[contains(@class, "d-popover") and contains(@class, "d-dropdown")]'
f'//div[contains(@class, "d-options-wrapper")]'
f'//div[contains(@class, "d-option-item")][1]'
)
first_option = await page.query_selector(first_option_xpath)
if first_option:
await first_option.click()
logger.info("已选择第一个推荐地点")
return True
logger.warning("地点设置失败,跳过")
return False
except Exception as e:
logger.error(f"设置地点失败: {e}")
return False
async def set_schedule_time(self, page: Page, publish_date: datetime):
"""设置定时发布时间(参考视频上传器的实现)"""
logger.info(f"设置定时发布: {publish_date}")
try:
# 点击"定时发布"标签使用locator方式
logger.info(" [-] 正在设置定时发布时间...")
# 使用Playwright的locator定位"定时发布"标签
schedule_label = page.locator("label:has-text('定时发布')")
await schedule_label.click()
await asyncio.sleep(1)
# 格式化发布时间格式YYYY-MM-DD HH:MM
publish_date_hour = publish_date.strftime("%Y-%m-%d %H:%M")
logger.info(f"发布时间: {publish_date_hour}")
await asyncio.sleep(1)
# 点击时间输入框并输入时间
await page.locator('.el-input__inner[placeholder="选择日期和时间"]').click()
await page.keyboard.press("Control+KeyA")
await page.keyboard.type(str(publish_date_hour))
await page.keyboard.press("Enter")
await asyncio.sleep(1)
logger.success(f"✅ 定时发布时间设置完成: {publish_date_hour}")
except Exception as e:
logger.error(f"设置定时发布失败: {e}")
raise
async def pre_publish_check(self, page: Page):
"""发布前检查"""
logger.info("执行发布前检查...")
checks = []
# 检查标题
try:
title_filled = await page.query_selector('input[class*="title"]')
title_value = await title_filled.input_value() if title_filled else ""
checks.append(("标题", len(title_value) > 0))
except:
checks.append(("标题", False))
# 检查标签
try:
tags = await page.query_selector_all('span[class*="topic"], span[class*="tag"]')
checks.append(("标签", len(tags) > 0))
except:
checks.append(("标签", False))
# 打印检查结果
for name, passed in checks:
status = "" if passed else ""
logger.info(f"{status} {name}: {'已填充' if passed else '未填充'}")
# 如果有未通过的检查
failed_checks = [name for name, passed in checks if not passed]
if failed_checks:
logger.warning(f"以下项目未完成: {', '.join(failed_checks)}")
async def publish(self, page: Page):
"""点击发布按钮"""
logger.info("准备发布...")
try:
# 等待发布按钮可点击
publish_button_text = "定时发布" if self.publish_date != 0 else "发布"
publish_button = await page.wait_for_selector(
f'button:has-text("{publish_button_text}")',
timeout=10000
)
# 模拟犹豫
await self.random_pause(1, 3)
# 点击发布
await publish_button.click()
logger.info(f"已点击'{publish_button_text}'按钮")
except Exception as e:
logger.error(f"点击发布按钮失败: {e}")
raise
async def wait_publish_success(self, page: Page):
"""等待发布成功"""
logger.info("等待发布完成...")
max_wait = 30 # 最多等待30秒
try:
# 等待跳转到成功页面
await page.wait_for_url(
"https://creator.xiaohongshu.com/publish/success?**",
timeout=max_wait * 1000
)
logger.success("✅ 笔记发布成功!")
return True
except Exception as e:
logger.error(f"发布失败或超时: {e}")
# 截图保存
screenshot_path = f"publish_failed_{int(time.time())}.png"
await page.screenshot(path=screenshot_path, full_page=True)
logger.info(f"错误截图已保存: {screenshot_path}")
raise
async def main(self):
"""主上传流程 - 子类必须实现"""
raise NotImplementedError("子类必须实现main方法")
# ============================================================================
# 图文笔记类
# ============================================================================
class XiaoHongShuImageNote(XiaoHongShuNote):
"""小红书图文笔记上传器"""
def __init__(
self,
title: str,
content: str,
tags: List[str],
image_paths: List[str],
publish_date,
account_file: str,
cover_index: int = 0,
filter_name: Optional[str] = None,
location: Optional[str] = None,
headless: bool = False
):
"""
初始化图文笔记上传器
Args:
image_paths: 图片路径列表1-9
cover_index: 封面索引0-8
filter_name: 滤镜名称可选
其他参数见基类
"""
super().__init__(title, content, tags, 'image', publish_date, account_file, location, headless)
# 验证图片数量
if not image_paths or len(image_paths) > 9:
raise ValueError("图片数量必须在1-9张之间")
self.image_paths = image_paths
self.cover_index = cover_index
self.filter_name = filter_name
logger.info(f"初始化图文笔记上传器: {len(image_paths)} 张图片")
async def upload_images(self, page: Page):
"""上传图片"""
logger.info(f"准备上传 {len(self.image_paths)} 张图片")
# 根据你提供的HTML结构定位上传输入框
# <input data-v-5aa68852="" data-v-7cbccdb2-s="" class="upload-input" type="file" multiple="" accept=".jpg,.jpeg,.png">
upload_selectors = [
"input.upload-input[type='file'][accept*='.jpg']",
"input.upload-input[accept*='.png']",
"input[type='file'][multiple][accept*='.jpg,.jpeg,.png']",
"div.upload-wrapper input.upload-input",
]
upload_input = None
for selector in upload_selectors:
try:
upload_input = await page.wait_for_selector(selector, timeout=5000)
if upload_input:
logger.info(f"找到上传元素: {selector}")
break
except:
continue
if not upload_input:
raise Exception("未找到图片上传元素")
# 批量上传所有图片因为input支持multiple
logger.info(f"批量上传 {len(self.image_paths)} 张图片...")
# 验证所有文件存在
for image_path in self.image_paths:
if not os.path.exists(image_path):
raise FileNotFoundError(f"图片文件不存在: {image_path}")
# 一次性上传所有图片
await upload_input.set_input_files(self.image_paths)
# 等待所有图片预览加载完成
await self.wait_all_images_preview(page, len(self.image_paths))
logger.success(f"✅ 所有图片上传完成")
async def wait_all_images_preview(self, page: Page, expected_count: int):
"""等待所有图片预览加载完成"""
max_wait = 60 # 最多等待60秒
waited = 0
check_interval = 1 # 每秒检查一次
logger.info(f"等待 {expected_count} 张图片预览加载...")
while waited < max_wait:
try:
# 查找所有图片预览元素
preview_selectors = [
'div[class*="image-item"]',
'div[class*="photo-item"]',
'div.upload-wrapper img',
'img[src*="blob"]',
]
loaded_count = 0
for selector in preview_selectors:
previews = await page.query_selector_all(selector)
if len(previews) >= expected_count:
loaded_count = len(previews)
break
if loaded_count >= expected_count:
logger.success(f"{loaded_count} 张图片预览已加载")
return True
# 显示进度
if waited % 5 == 0:
logger.info(f"已等待 {waited}秒,当前已加载 {loaded_count}/{expected_count} 张图片")
await asyncio.sleep(check_interval)
waited += check_interval
except Exception as e:
await asyncio.sleep(check_interval)
waited += check_interval
logger.warning(f"图片预览等待超时(已等待{waited}秒)")
return False
async def wait_image_preview(self, page: Page, image_index: int):
"""等待单张图片预览加载(已弃用,保留兼容性)"""
max_wait = 30 # 最多等待30秒
waited = 0
while waited < max_wait:
try:
# 查找预览容器
preview_container = await page.query_selector_all(
'div[class*="preview"], div[class*="image-item"], div[class*="photo"]'
)
if len(preview_container) > image_index:
# 检查是否有加载完成标识
img_element = await preview_container[image_index].query_selector('img')
if img_element:
src = await img_element.get_attribute('src')
if src and (src.startswith('http') or src.startswith('blob')):
logger.info(f"图片 {image_index+1} 预览加载完成")
return True
await asyncio.sleep(0.5)
waited += 0.5
except Exception as e:
await asyncio.sleep(0.5)
waited += 0.5
logger.warning(f"图片 {image_index+1} 预览超时")
return False
async def set_cover(self, page: Page):
"""设置封面"""
if self.cover_index == 0:
logger.info("使用默认封面(第一张图片)")
return
logger.info(f"设置封面: 第 {self.cover_index+1} 张图片")
try:
# 这里需要根据实际页面结构实现
# 小红书图文笔记默认使用第一张作为封面
logger.info("图文笔记默认使用第一张图片作为封面")
except Exception as e:
logger.error(f"设置封面失败: {e}")
async def apply_filters(self, page: Page):
"""应用滤镜"""
if not self.filter_name:
logger.info("未设置滤镜,跳过")
return
logger.info(f"应用滤镜: {self.filter_name}")
try:
# 这里需要根据实际页面结构实现
logger.info("滤镜功能待实现")
except Exception as e:
logger.error(f"应用滤镜失败: {e}")
async def main(self):
"""图文笔记主上传流程"""
async with async_playwright() as playwright:
browser = None
context = None
try:
# 步骤1: 创建浏览器
logger.info("[1/11] 创建浏览器环境...")
browser, context = await self.create_note_browser(playwright)
# 步骤2: 创建页面
logger.info("[2/11] 创建页面...")
page = await context.new_page()
# 步骤3: 访问发布页面
logger.info("[3/11] 访问笔记发布页面...")
await self.navigate_to_publish_page(page)
# 步骤4: 模拟人类浏览
logger.info("[4/11] 模拟浏览行为...")
await self.simulate_human_behavior(page)
# 步骤5: 上传图片
logger.info("[5/11] 上传图片...")
await self.upload_images(page)
# 步骤6: 填充标题
logger.info("[6/11] 填充标题...")
await self.fill_title(page)
# 步骤7: 填充正文
logger.info("[7/11] 填充正文...")
await self.fill_content_text(page)
# 步骤8: 添加标签
logger.info("[8/11] 添加标签...")
await self.add_tags(page)
# 步骤9: 设置地点(可选)
if self.location:
logger.info("[9/11] 设置地点...")
await self.set_location(page, self.location)
else:
logger.info("[9/11] 跳过地点设置")
# 步骤10: 设置定时发布(可选)
if self.publish_date != 0:
logger.info("[10/11] 设置定时发布...")
await self.set_schedule_time(page, self.publish_date)
else:
logger.info("[10/11] 立即发布模式")
# 步骤11: 发布
logger.info("[11/11] 发布笔记...")
await self.pre_publish_check(page)
await self.random_pause(2, 5) # 模拟犹豫
await self.publish(page)
# 等待发布成功
await self.wait_publish_success(page)
# 保存Cookie
await context.storage_state(path=self.account_file)
logger.success("✅ Cookie已更新")
logger.success("🎉 图文笔记发布成功!")
except Exception as e:
logger.error(f"❌ 发布失败: {e}")
if context and page:
await page.screenshot(path=f"error_image_note_{int(time.time())}.png", full_page=True)
raise
finally:
if context:
await context.close()
if browser:
await browser.close()
# ============================================================================
# 视频笔记类
# ============================================================================
class XiaoHongShuVideoNote(XiaoHongShuNote):
"""小红书视频笔记上传器"""
def __init__(
self,
title: str,
content: str,
tags: List[str],
video_path: str,
publish_date,
account_file: str,
thumbnail_path: Optional[str] = None,
location: Optional[str] = None,
headless: bool = False
):
"""
初始化视频笔记上传器
Args:
video_path: 视频文件路径
thumbnail_path: 视频封面路径可选
其他参数见基类
"""
super().__init__(title, content, tags, 'video', publish_date, account_file, location, headless)
self.video_path = video_path
self.thumbnail_path = thumbnail_path
# 验证文件
if not os.path.exists(video_path):
raise FileNotFoundError(f"视频文件不存在: {video_path}")
file_size = os.path.getsize(video_path) / (1024 * 1024) # MB
logger.info(f"初始化视频笔记上传器: 视频大小 {file_size:.2f} MB")
async def upload_video(self, page: Page):
"""上传视频"""
logger.info(f"准备上传视频: {self.video_path}")
file_size = os.path.getsize(self.video_path) / (1024 * 1024) # MB
logger.info(f"视频大小: {file_size:.2f} MB")
# 定位上传元素
upload_selectors = [
"input[type='file'][accept*='video']",
"div[class^='upload-content'] input.upload-input",
"input.upload-input",
]
upload_input = None
for selector in upload_selectors:
try:
upload_input = await page.wait_for_selector(selector, timeout=5000)
if upload_input:
logger.info(f"找到上传元素: {selector}")
break
except:
continue
if not upload_input:
raise Exception("未找到视频上传元素")
# 上传视频
logger.info("开始上传视频...")
await upload_input.set_input_files(self.video_path)
# 等待上传和转码
await self.wait_video_upload_complete(page, file_size)
logger.success("✅ 视频上传完成")
async def wait_video_upload_complete(self, page: Page, file_size_mb: float):
"""等待视频上传和转码完成"""
# 估算最大等待时间
estimated_upload_time = file_size_mb * 2 # 秒
estimated_transcode_time = 60 # 秒
max_wait_time = estimated_upload_time + estimated_transcode_time + 60
logger.info(f"预计最多等待 {max_wait_time:.0f}")
waited = 0
check_interval = 3 # 每3秒检查一次
while waited < max_wait_time:
try:
# 查找"上传成功"标识
upload_input = await page.wait_for_selector('input.upload-input', timeout=3000)
if upload_input:
preview_new = await upload_input.query_selector(
'xpath=following-sibling::div[contains(@class, "preview")]'
)
if preview_new:
stage_elements = await preview_new.query_selector_all('div.stage, div[class*="status"]')
for stage in stage_elements:
text_content = await page.evaluate(
'(element) => element.textContent',
stage
)
if '上传成功' in text_content or '转码完成' in text_content:
logger.success("检测到上传成功标识")
return True
if '上传失败' in text_content or '转码失败' in text_content:
raise Exception("视频上传或转码失败")
# 显示等待进度
if waited % 10 == 0:
logger.info(f"已等待 {waited}/{max_wait_time:.0f} 秒...")
await asyncio.sleep(check_interval)
waited += check_interval
except Exception as e:
await asyncio.sleep(check_interval)
waited += check_interval
raise TimeoutError(f"视频上传超时(等待了{max_wait_time:.0f}秒)")
async def main(self):
"""视频笔记主上传流程"""
async with async_playwright() as playwright:
browser = None
context = None
try:
# 步骤1: 创建浏览器
logger.info("[1/11] 创建浏览器环境...")
browser, context = await self.create_note_browser(playwright)
# 步骤2: 创建页面
logger.info("[2/11] 创建页面...")
page = await context.new_page()
# 步骤3: 访问发布页面
logger.info("[3/11] 访问笔记发布页面...")
await self.navigate_to_publish_page(page)
# 步骤4: 模拟人类浏览
logger.info("[4/11] 模拟浏览行为...")
await self.simulate_human_behavior(page)
# 步骤5: 上传视频
logger.info("[5/11] 上传视频...")
await self.upload_video(page)
# 步骤6: 填充标题
logger.info("[6/11] 填充标题...")
await self.fill_title(page)
# 步骤7: 填充正文
logger.info("[7/11] 填充正文...")
await self.fill_content_text(page)
# 步骤8: 添加标签
logger.info("[8/11] 添加标签...")
await self.add_tags(page)
# 步骤9: 设置地点(可选)
if self.location:
logger.info("[9/11] 设置地点...")
await self.set_location(page, self.location)
else:
logger.info("[9/11] 跳过地点设置")
# 步骤10: 设置定时发布(可选)
if self.publish_date != 0:
logger.info("[10/11] 设置定时发布...")
await self.set_schedule_time(page, self.publish_date)
else:
logger.info("[10/11] 立即发布模式")
# 步骤11: 发布
logger.info("[11/11] 发布笔记...")
await self.pre_publish_check(page)
await self.random_pause(2, 5) # 模拟犹豫
await self.publish(page)
# 等待发布成功
await self.wait_publish_success(page)
# 保存Cookie
await context.storage_state(path=self.account_file)
logger.success("✅ Cookie已更新")
logger.success("🎉 视频笔记发布成功!")
except Exception as e:
logger.error(f"❌ 发布失败: {e}")
if context and page:
await page.screenshot(path=f"error_video_note_{int(time.time())}.png", full_page=True)
raise
finally:
if context:
await context.close()
if browser:
await browser.close()