1154 lines
42 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
小红书笔记上传器 - 主实现
完全模仿视频上传器的实现,强化反爬虫能力
支持图文笔记和视频笔记
"""
import os
import time
import random
import asyncio
from datetime import datetime
from pathlib import Path
from typing import List, Optional
from playwright.async_api import Playwright, async_playwright, Page
from conf import LOCAL_CHROME_PATH, BASE_DIR
from utils.base_social_media import set_init_script
from utils.anti_detection import create_stealth_browser, create_stealth_context
from utils.human_typing_wrapper import create_human_typer, HumanTypingWrapper
from utils.log import xiaohongshu_logger as logger
# ============================================================================
# Cookie管理
# ============================================================================
async def cookie_auth(account_file: str) -> bool:
"""
验证Cookie是否有效完整反检测版本
Args:
account_file: Cookie文件路径
Returns:
bool: Cookie是否有效
"""
try:
async with async_playwright() as playwright:
# ✅ 使用反检测浏览器
browser = await create_stealth_browser(
playwright,
headless=True,
custom_args=['--disable-blink-features=AutomationControlled']
)
# ✅ 使用反检测上下文
context = await create_stealth_context(
browser,
account_file=account_file,
headless=True,
custom_options={
'viewport': {'width': 1920, 'height': 1080},
'locale': 'zh-CN',
'timezone_id': 'Asia/Shanghai',
}
)
# ✅ 注入stealth脚本
context = await set_init_script(context)
page = await context.new_page()
# 访问创作者中心
await page.goto("https://creator.xiaohongshu.com/publish/publish")
try:
await page.wait_for_url(
"https://creator.xiaohongshu.com/publish/publish**",
timeout=5000
)
except:
logger.warning("[+] Cookie可能失效")
await context.close()
await browser.close()
return False
# 检查是否有登录提示
if await page.get_by_text('手机号登录').count() or await page.get_by_text('扫码登录').count():
logger.warning("[+] 检测到登录页面Cookie失效")
await context.close()
await browser.close()
return False
logger.info("[+] Cookie有效")
await context.close()
await browser.close()
return True
except Exception as e:
logger.error(f"Cookie验证失败: {e}")
return False
async def xiaohongshu_note_setup(account_file: str, handle: bool = False) -> bool:
"""
设置小红书笔记上传器检查或生成Cookie
Args:
account_file: Cookie文件路径
handle: 是否处理Cookie失效重新登录
Returns:
bool: 设置是否成功
"""
if not os.path.exists(account_file) or not await cookie_auth(account_file):
if not handle:
logger.warning('[!] Cookie文件不存在或已失效')
return False
logger.info('[+] Cookie文件不存在或已失效即将自动打开浏览器请扫码登录')
await xiaohongshu_note_cookie_gen(account_file)
return True
async def xiaohongshu_note_cookie_gen(account_file: str):
"""
生成Cookie完整反检测版本
Args:
account_file: Cookie保存路径
"""
async with async_playwright() as playwright:
# ✅ 使用反检测浏览器
browser = await create_stealth_browser(
playwright,
headless=False, # 生成Cookie必须使用有头模式
custom_args=[
'--disable-blink-features=AutomationControlled',
'--lang=zh-CN',
]
)
# ✅ 创建反检测上下文无Cookie
context_options = {
'viewport': {'width': 1920, 'height': 1080},
'locale': 'zh-CN',
'timezone_id': 'Asia/Shanghai',
'device_scale_factor': 1,
'has_touch': False,
'is_mobile': False,
}
# 有头模式下也设置真实User-Agent
user_agent = random.choice([
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
])
context_options['user_agent'] = user_agent
context = await browser.new_context(**context_options)
# ✅ 注入stealth脚本
context = await set_init_script(context)
page = await context.new_page()
await page.goto("https://creator.xiaohongshu.com/")
# 暂停等待用户登录
await page.pause()
# 保存Cookie
await context.storage_state(path=account_file)
logger.success(f'[+] Cookie已保存到: {account_file}')
await context.close()
await browser.close()
# ============================================================================
# 基础笔记类
# ============================================================================
class XiaoHongShuNote:
"""小红书笔记上传器基类"""
def __init__(
self,
title: str,
content: str,
tags: List[str],
note_type: str,
publish_date,
account_file: str,
location: Optional[str] = None,
headless: bool = False
):
"""
初始化笔记上传器
Args:
title: 笔记标题
content: 笔记正文内容
tags: 话题标签列表
note_type: 笔记类型 ('image''video')
publish_date: 发布时间0表示立即发布
account_file: Cookie文件路径
location: 地点(可选)
headless: 是否使用无头模式(不推荐)
"""
self.title = title[:30] # 限制30字符
self.content = content[:1000] # 限制1000字符
self.tags = tags[:3] # 限制3个标签
self.note_type = note_type
self.publish_date = publish_date
self.account_file = account_file
self.location = location
self.headless = headless
self.local_executable_path = LOCAL_CHROME_PATH
logger.info(f"初始化笔记上传器: {note_type}, 标题={title[:20]}...")
async def create_note_browser(self, playwright: Playwright):
"""创建具有强反检测能力的浏览器"""
logger.info("创建反检测浏览器...")
# 自定义浏览器参数
custom_args = [
'--disable-blink-features=AutomationControlled',
'--disable-dev-shm-usage',
'--no-sandbox',
'--disable-setuid-sandbox',
'--disable-web-security',
'--disable-features=IsolateOrigins,site-per-process',
'--lang=zh-CN',
'--window-size=1920,1080',
]
# 创建隐蔽浏览器
browser = await create_stealth_browser(
playwright,
headless=self.headless,
executable_path=self.local_executable_path,
custom_args=custom_args
)
# 创建隐蔽上下文
context = await create_stealth_context(
browser,
account_file=self.account_file,
headless=self.headless,
custom_options={
'viewport': {'width': 1920, 'height': 1080},
'locale': 'zh-CN',
'timezone_id': 'Asia/Shanghai',
'device_scale_factor': 1,
'has_touch': False,
'is_mobile': False,
}
)
return browser, context
async def random_pause(self, min_sec: float = 1.0, max_sec: float = 3.0):
"""随机停顿"""
wait_time = random.uniform(min_sec, max_sec)
await asyncio.sleep(wait_time)
async def simulate_human_behavior(self, page: Page):
"""模拟真实用户行为"""
logger.info("模拟人类浏览行为...")
# 随机移动鼠标
for _ in range(random.randint(2, 4)):
await page.mouse.move(
random.randint(100, 800),
random.randint(100, 600)
)
await asyncio.sleep(random.uniform(0.3, 1.0))
# 随机滚动
await page.mouse.wheel(0, random.randint(-100, 100))
await asyncio.sleep(random.uniform(0.5, 1.5))
async def navigate_to_publish_page(self, page: Page):
"""访问发布页面"""
logger.info("访问笔记发布页面...")
# 根据笔记类型选择URL
if self.note_type == 'image':
url = "https://creator.xiaohongshu.com/publish/publish?from=menu&target=image"
else:
url = "https://creator.xiaohongshu.com/publish/publish?from=menu&target=video"
await page.goto(url)
await page.wait_for_load_state('domcontentloaded')
logger.success(f"✅ 已打开发布页面: {url}")
async def fill_title(self, page: Page):
"""填充标题(人类化输入)"""
logger.info("填充标题...")
# 根据你提供的HTML结构定位标题输入框
# <input class="d-text" type="text" placeholder="填写标题会有更多赞哦~" value="">
title_selectors = [
'input.d-text[type="text"][placeholder="填写标题会有更多赞哦~"]',
'input.d-text[placeholder*="标题"]',
'div.plugin.title-container input.d-text',
'.notranslate',
]
title_input = None
for selector in title_selectors:
try:
title_input = await page.wait_for_selector(selector, timeout=3000)
if title_input:
logger.info(f"找到标题输入框: {selector}")
break
except:
continue
if not title_input:
raise Exception("未找到标题输入框")
# 先点击输入框获得焦点
await title_input.click()
await self.random_pause(0.3, 0.8)
# 创建人类化输入器
normal_typer = create_human_typer(page, {
'min_delay': 80,
'max_delay': 150,
'pause_probability': 0.15,
'pause_min': 300,
'pause_max': 800,
})
# 输入标题
success = await normal_typer.type_text_human(
title_selectors[0],
self.title,
clear_first=True
)
if not success:
logger.warning("人类化输入失败,使用传统方式")
await title_input.click()
await page.keyboard.press("Control+A")
await page.keyboard.press("Delete")
await page.keyboard.type(self.title, delay=100)
logger.success(f"✅ 标题填充完成: {self.title}")
await self.random_pause(0.5, 1.5)
async def fill_content_text(self, page: Page):
"""填充正文内容(直接粘贴,参考视频上传器)"""
logger.info("填充正文...")
# 根据你提供的HTML结构定位TipTap编辑器
# <div contenteditable="true" role="textbox" translate="no" class="tiptap ProseMirror" tabindex="0">
content_selectors = [
'div.tiptap.ProseMirror[contenteditable="true"]',
'div[contenteditable="true"][role="textbox"].tiptap',
'div.editor-container div.tiptap[contenteditable="true"]',
'#publish-container .editor-content > div > div',
]
content_input = None
for selector in content_selectors:
try:
content_input = await page.wait_for_selector(selector, timeout=3000)
if content_input:
logger.info(f"找到正文输入框: {selector}")
break
except:
continue
if not content_input:
logger.warning("未找到正文输入框,跳过")
return
# 点击输入框获得焦点
await content_input.click()
await self.random_pause(0.3, 0.8)
# 直接粘贴内容(不使用人类化输入,速度更快)
# 使用 Ctrl+V 模拟粘贴操作
try:
# 方式1使用 page.evaluate 直接设置内容(最快)
await page.evaluate(f'''
(selector) => {{
const element = document.querySelector(selector);
if (element) {{
element.textContent = `{self.content.replace('`', '\\`')}`;
element.dispatchEvent(new Event('input', {{ bubbles: true }}));
}}
}}
''', content_selectors[0])
logger.success(f"✅ 正文填充完成(直接粘贴,{len(self.content)} 字符)")
except Exception as e:
logger.warning(f"直接粘贴失败: {e},使用键盘输入")
# 备用方案:使用键盘快速输入(不使用人类化延迟)
await page.keyboard.type(self.content, delay=0)
logger.success(f"✅ 正文填充完成(键盘输入,{len(self.content)} 字符)")
await self.random_pause(0.5, 1.0)
async def add_tags(self, page: Page):
"""添加话题标签(极慢速模式)"""
logger.info("添加话题标签...")
# 标签应该添加到正文编辑器中
# 使用与正文相同的TipTap编辑器
tag_selectors = [
'div.tiptap.ProseMirror[contenteditable="true"]',
'div[contenteditable="true"][role="textbox"].tiptap',
'#publish-container .editor-content > div > div',
]
tag_selector = tag_selectors[0]
# 创建极慢速输入器(标签输入需要更慢)
slow_typer = HumanTypingWrapper(page, {
'min_delay': 500,
'max_delay': 800,
'pause_probability': 0.3,
'pause_min': 500,
'pause_max': 1200,
'correction_probability': 0.0,
'backspace_probability': 0.0,
})
# 如果正文不为空,先添加换行
if self.content:
await page.keyboard.press("Enter")
await self.random_pause(0.5, 1.0)
# 逐个输入标签
for i, tag in enumerate(self.tags):
logger.info(f"输入标签 {i+1}/{len(self.tags)}: {tag}")
# 输入 # 符号和标签文本
tag_text = f"#{tag}"
success = await slow_typer.type_text_human(
tag_selector,
tag_text,
clear_first=False
)
if not success:
logger.warning(f"标签 {tag} 人类化输入失败,使用传统方式")
await page.keyboard.type("#")
for char in tag:
await page.keyboard.type(char, delay=600)
await asyncio.sleep(0.1)
# 按回车或空格(标签之间用换行分隔)
await page.keyboard.press("Enter")
# 标签间停顿800ms
await page.wait_for_timeout(800)
logger.info(f"✅ 标签 {tag} 添加完成")
logger.success(f"✅ 所有标签添加完成 (共 {len(self.tags)} 个)")
await self.random_pause(1, 2)
async def set_location(self, page: Page, location: str):
"""设置发布地点"""
logger.info(f"设置地点: {location}")
try:
# 点击地点输入框
loc_selectors = [
'div.d-text.d-select-placeholder',
'input[placeholder*="地点"]',
'div:has-text("添加地点")',
]
loc_element = None
for selector in loc_selectors:
try:
loc_element = await page.wait_for_selector(selector, timeout=3000)
if loc_element:
break
except:
continue
if not loc_element:
logger.warning("未找到地点输入框,跳过")
return False
await loc_element.click()
await self.random_pause(0.5, 1.0)
# 输入地点名称
await page.keyboard.type(location, delay=200)
logger.info(f"已输入地点名称: {location}")
# 等待下拉列表加载
await asyncio.sleep(3)
# 使用灵活的选择器
flexible_xpath = (
f'//div[contains(@class, "d-popover") and contains(@class, "d-dropdown")]'
f'//div[contains(@class, "d-options-wrapper")]'
f'//div[contains(@class, "name") and text()="{location}"]'
)
try:
location_option = await page.wait_for_selector(
flexible_xpath,
timeout=5000
)
if location_option:
await location_option.scroll_into_view_if_needed()
await asyncio.sleep(0.5)
await location_option.click()
logger.success(f"✅ 地点设置成功: {location}")
await self.random_pause(0.5, 1.0)
return True
except Exception:
logger.warning(f"未找到匹配的地点,尝试选择第一个选项")
# 尝试选择第一个选项
first_option_xpath = (
f'//div[contains(@class, "d-popover") and contains(@class, "d-dropdown")]'
f'//div[contains(@class, "d-options-wrapper")]'
f'//div[contains(@class, "d-option-item")][1]'
)
first_option = await page.query_selector(first_option_xpath)
if first_option:
await first_option.click()
logger.info("已选择第一个推荐地点")
return True
logger.warning("地点设置失败,跳过")
return False
except Exception as e:
logger.error(f"设置地点失败: {e}")
return False
async def set_schedule_time(self, page: Page, publish_date: datetime):
"""设置定时发布时间(参考视频上传器的实现)"""
logger.info(f"设置定时发布: {publish_date}")
try:
# 点击"定时发布"标签使用locator方式
logger.info(" [-] 正在设置定时发布时间...")
# 使用Playwright的locator定位"定时发布"标签
schedule_label = page.locator("label:has-text('定时发布')")
await schedule_label.click()
await asyncio.sleep(1)
# 格式化发布时间格式YYYY-MM-DD HH:MM
publish_date_hour = publish_date.strftime("%Y-%m-%d %H:%M")
logger.info(f"发布时间: {publish_date_hour}")
await asyncio.sleep(1)
# 点击时间输入框并输入时间
await page.locator('.el-input__inner[placeholder="选择日期和时间"]').click()
await page.keyboard.press("Control+KeyA")
await page.keyboard.type(str(publish_date_hour))
await page.keyboard.press("Enter")
await asyncio.sleep(1)
logger.success(f"✅ 定时发布时间设置完成: {publish_date_hour}")
except Exception as e:
logger.error(f"设置定时发布失败: {e}")
raise
async def pre_publish_check(self, page: Page):
"""发布前检查"""
logger.info("执行发布前检查...")
checks = []
# 检查标题
try:
title_filled = await page.query_selector('input[class*="title"]')
title_value = await title_filled.input_value() if title_filled else ""
checks.append(("标题", len(title_value) > 0))
except:
checks.append(("标题", False))
# 检查标签
try:
tags = await page.query_selector_all('span[class*="topic"], span[class*="tag"]')
checks.append(("标签", len(tags) > 0))
except:
checks.append(("标签", False))
# 打印检查结果
for name, passed in checks:
status = "" if passed else ""
logger.info(f"{status} {name}: {'已填充' if passed else '未填充'}")
# 如果有未通过的检查
failed_checks = [name for name, passed in checks if not passed]
if failed_checks:
logger.warning(f"以下项目未完成: {', '.join(failed_checks)}")
async def publish(self, page: Page):
"""点击发布按钮"""
logger.info("准备发布...")
try:
# 等待发布按钮可点击
publish_button_text = "定时发布" if self.publish_date != 0 else "发布"
publish_button = await page.wait_for_selector(
f'button:has-text("{publish_button_text}")',
timeout=10000
)
# 模拟犹豫
await self.random_pause(1, 3)
# 点击发布
await publish_button.click()
logger.info(f"已点击'{publish_button_text}'按钮")
except Exception as e:
logger.error(f"点击发布按钮失败: {e}")
raise
async def wait_publish_success(self, page: Page):
"""等待发布成功"""
logger.info("等待发布完成...")
max_wait = 30 # 最多等待30秒
try:
# 等待跳转到成功页面
await page.wait_for_url(
"https://creator.xiaohongshu.com/publish/success?**",
timeout=max_wait * 1000
)
logger.success("✅ 笔记发布成功!")
return True
except Exception as e:
logger.error(f"发布失败或超时: {e}")
# 截图保存
screenshot_path = f"publish_failed_{int(time.time())}.png"
await page.screenshot(path=screenshot_path, full_page=True)
logger.info(f"错误截图已保存: {screenshot_path}")
raise
async def main(self):
"""主上传流程 - 子类必须实现"""
raise NotImplementedError("子类必须实现main方法")
# ============================================================================
# 图文笔记类
# ============================================================================
class XiaoHongShuImageNote(XiaoHongShuNote):
"""小红书图文笔记上传器"""
def __init__(
self,
title: str,
content: str,
tags: List[str],
image_paths: List[str],
publish_date,
account_file: str,
cover_index: int = 0,
filter_name: Optional[str] = None,
location: Optional[str] = None,
headless: bool = False
):
"""
初始化图文笔记上传器
Args:
image_paths: 图片路径列表1-9张
cover_index: 封面索引0-8
filter_name: 滤镜名称(可选)
其他参数见基类
"""
super().__init__(title, content, tags, 'image', publish_date, account_file, location, headless)
# 验证图片数量
if not image_paths or len(image_paths) > 9:
raise ValueError("图片数量必须在1-9张之间")
self.image_paths = image_paths
self.cover_index = cover_index
self.filter_name = filter_name
logger.info(f"初始化图文笔记上传器: {len(image_paths)} 张图片")
async def upload_images(self, page: Page):
"""上传图片"""
logger.info(f"准备上传 {len(self.image_paths)} 张图片")
# 根据你提供的HTML结构定位上传输入框
# <input data-v-5aa68852="" data-v-7cbccdb2-s="" class="upload-input" type="file" multiple="" accept=".jpg,.jpeg,.png">
upload_selectors = [
"input.upload-input[type='file'][accept*='.jpg']",
"input.upload-input[accept*='.png']",
"input[type='file'][multiple][accept*='.jpg,.jpeg,.png']",
"div.upload-wrapper input.upload-input",
]
upload_input = None
for selector in upload_selectors:
try:
upload_input = await page.wait_for_selector(selector, timeout=5000)
if upload_input:
logger.info(f"找到上传元素: {selector}")
break
except:
continue
if not upload_input:
raise Exception("未找到图片上传元素")
# 批量上传所有图片因为input支持multiple
logger.info(f"批量上传 {len(self.image_paths)} 张图片...")
# 验证所有文件存在
for image_path in self.image_paths:
if not os.path.exists(image_path):
raise FileNotFoundError(f"图片文件不存在: {image_path}")
# 一次性上传所有图片
await upload_input.set_input_files(self.image_paths)
# 等待所有图片预览加载完成
await self.wait_all_images_preview(page, len(self.image_paths))
logger.success(f"✅ 所有图片上传完成")
async def wait_all_images_preview(self, page: Page, expected_count: int):
"""等待所有图片预览加载完成"""
max_wait = 60 # 最多等待60秒
waited = 0
check_interval = 1 # 每秒检查一次
logger.info(f"等待 {expected_count} 张图片预览加载...")
while waited < max_wait:
try:
# 查找所有图片预览元素
preview_selectors = [
'div[class*="image-item"]',
'div[class*="photo-item"]',
'div.upload-wrapper img',
'img[src*="blob"]',
]
loaded_count = 0
for selector in preview_selectors:
previews = await page.query_selector_all(selector)
if len(previews) >= expected_count:
loaded_count = len(previews)
break
if loaded_count >= expected_count:
logger.success(f"{loaded_count} 张图片预览已加载")
return True
# 显示进度
if waited % 5 == 0:
logger.info(f"已等待 {waited}秒,当前已加载 {loaded_count}/{expected_count} 张图片")
await asyncio.sleep(check_interval)
waited += check_interval
except Exception as e:
await asyncio.sleep(check_interval)
waited += check_interval
logger.warning(f"图片预览等待超时(已等待{waited}秒)")
return False
async def wait_image_preview(self, page: Page, image_index: int):
"""等待单张图片预览加载(已弃用,保留兼容性)"""
max_wait = 30 # 最多等待30秒
waited = 0
while waited < max_wait:
try:
# 查找预览容器
preview_container = await page.query_selector_all(
'div[class*="preview"], div[class*="image-item"], div[class*="photo"]'
)
if len(preview_container) > image_index:
# 检查是否有加载完成标识
img_element = await preview_container[image_index].query_selector('img')
if img_element:
src = await img_element.get_attribute('src')
if src and (src.startswith('http') or src.startswith('blob')):
logger.info(f"图片 {image_index+1} 预览加载完成")
return True
await asyncio.sleep(0.5)
waited += 0.5
except Exception as e:
await asyncio.sleep(0.5)
waited += 0.5
logger.warning(f"图片 {image_index+1} 预览超时")
return False
async def set_cover(self, page: Page):
"""设置封面"""
if self.cover_index == 0:
logger.info("使用默认封面(第一张图片)")
return
logger.info(f"设置封面: 第 {self.cover_index+1} 张图片")
try:
# 这里需要根据实际页面结构实现
# 小红书图文笔记默认使用第一张作为封面
logger.info("图文笔记默认使用第一张图片作为封面")
except Exception as e:
logger.error(f"设置封面失败: {e}")
async def apply_filters(self, page: Page):
"""应用滤镜"""
if not self.filter_name:
logger.info("未设置滤镜,跳过")
return
logger.info(f"应用滤镜: {self.filter_name}")
try:
# 这里需要根据实际页面结构实现
logger.info("滤镜功能待实现")
except Exception as e:
logger.error(f"应用滤镜失败: {e}")
async def main(self):
"""图文笔记主上传流程"""
async with async_playwright() as playwright:
browser = None
context = None
try:
# 步骤1: 创建浏览器
logger.info("[1/11] 创建浏览器环境...")
browser, context = await self.create_note_browser(playwright)
# 步骤2: 创建页面
logger.info("[2/11] 创建页面...")
page = await context.new_page()
# 步骤3: 访问发布页面
logger.info("[3/11] 访问笔记发布页面...")
await self.navigate_to_publish_page(page)
# 步骤4: 模拟人类浏览
logger.info("[4/11] 模拟浏览行为...")
await self.simulate_human_behavior(page)
# 步骤5: 上传图片
logger.info("[5/11] 上传图片...")
await self.upload_images(page)
# 步骤6: 填充标题
logger.info("[6/11] 填充标题...")
await self.fill_title(page)
# 步骤7: 填充正文
logger.info("[7/11] 填充正文...")
await self.fill_content_text(page)
# 步骤8: 添加标签
logger.info("[8/11] 添加标签...")
await self.add_tags(page)
# 步骤9: 设置地点(可选)
if self.location:
logger.info("[9/11] 设置地点...")
await self.set_location(page, self.location)
else:
logger.info("[9/11] 跳过地点设置")
# 步骤10: 设置定时发布(可选)
if self.publish_date != 0:
logger.info("[10/11] 设置定时发布...")
await self.set_schedule_time(page, self.publish_date)
else:
logger.info("[10/11] 立即发布模式")
# 步骤11: 发布
logger.info("[11/11] 发布笔记...")
await self.pre_publish_check(page)
await self.random_pause(2, 5) # 模拟犹豫
await self.publish(page)
# 等待发布成功
await self.wait_publish_success(page)
# 保存Cookie
await context.storage_state(path=self.account_file)
logger.success("✅ Cookie已更新")
logger.success("🎉 图文笔记发布成功!")
except Exception as e:
logger.error(f"❌ 发布失败: {e}")
if context and page:
await page.screenshot(path=f"error_image_note_{int(time.time())}.png", full_page=True)
raise
finally:
if context:
await context.close()
if browser:
await browser.close()
# ============================================================================
# 视频笔记类
# ============================================================================
class XiaoHongShuVideoNote(XiaoHongShuNote):
"""小红书视频笔记上传器"""
def __init__(
self,
title: str,
content: str,
tags: List[str],
video_path: str,
publish_date,
account_file: str,
thumbnail_path: Optional[str] = None,
location: Optional[str] = None,
headless: bool = False
):
"""
初始化视频笔记上传器
Args:
video_path: 视频文件路径
thumbnail_path: 视频封面路径(可选)
其他参数见基类
"""
super().__init__(title, content, tags, 'video', publish_date, account_file, location, headless)
self.video_path = video_path
self.thumbnail_path = thumbnail_path
# 验证文件
if not os.path.exists(video_path):
raise FileNotFoundError(f"视频文件不存在: {video_path}")
file_size = os.path.getsize(video_path) / (1024 * 1024) # MB
logger.info(f"初始化视频笔记上传器: 视频大小 {file_size:.2f} MB")
async def upload_video(self, page: Page):
"""上传视频"""
logger.info(f"准备上传视频: {self.video_path}")
file_size = os.path.getsize(self.video_path) / (1024 * 1024) # MB
logger.info(f"视频大小: {file_size:.2f} MB")
# 定位上传元素
upload_selectors = [
"input[type='file'][accept*='video']",
"div[class^='upload-content'] input.upload-input",
"input.upload-input",
]
upload_input = None
for selector in upload_selectors:
try:
upload_input = await page.wait_for_selector(selector, timeout=5000)
if upload_input:
logger.info(f"找到上传元素: {selector}")
break
except:
continue
if not upload_input:
raise Exception("未找到视频上传元素")
# 上传视频
logger.info("开始上传视频...")
await upload_input.set_input_files(self.video_path)
# 等待上传和转码
await self.wait_video_upload_complete(page, file_size)
logger.success("✅ 视频上传完成")
async def wait_video_upload_complete(self, page: Page, file_size_mb: float):
"""等待视频上传和转码完成"""
# 估算最大等待时间
estimated_upload_time = file_size_mb * 2 # 秒
estimated_transcode_time = 60 # 秒
max_wait_time = estimated_upload_time + estimated_transcode_time + 60
logger.info(f"预计最多等待 {max_wait_time:.0f}")
waited = 0
check_interval = 3 # 每3秒检查一次
while waited < max_wait_time:
try:
# 查找"上传成功"标识
upload_input = await page.wait_for_selector('input.upload-input', timeout=3000)
if upload_input:
preview_new = await upload_input.query_selector(
'xpath=following-sibling::div[contains(@class, "preview")]'
)
if preview_new:
stage_elements = await preview_new.query_selector_all('div.stage, div[class*="status"]')
for stage in stage_elements:
text_content = await page.evaluate(
'(element) => element.textContent',
stage
)
if '上传成功' in text_content or '转码完成' in text_content:
logger.success("检测到上传成功标识")
return True
if '上传失败' in text_content or '转码失败' in text_content:
raise Exception("视频上传或转码失败")
# 显示等待进度
if waited % 10 == 0:
logger.info(f"已等待 {waited}/{max_wait_time:.0f} 秒...")
await asyncio.sleep(check_interval)
waited += check_interval
except Exception as e:
await asyncio.sleep(check_interval)
waited += check_interval
raise TimeoutError(f"视频上传超时(等待了{max_wait_time:.0f}秒)")
async def main(self):
"""视频笔记主上传流程"""
async with async_playwright() as playwright:
browser = None
context = None
try:
# 步骤1: 创建浏览器
logger.info("[1/11] 创建浏览器环境...")
browser, context = await self.create_note_browser(playwright)
# 步骤2: 创建页面
logger.info("[2/11] 创建页面...")
page = await context.new_page()
# 步骤3: 访问发布页面
logger.info("[3/11] 访问笔记发布页面...")
await self.navigate_to_publish_page(page)
# 步骤4: 模拟人类浏览
logger.info("[4/11] 模拟浏览行为...")
await self.simulate_human_behavior(page)
# 步骤5: 上传视频
logger.info("[5/11] 上传视频...")
await self.upload_video(page)
# 步骤6: 填充标题
logger.info("[6/11] 填充标题...")
await self.fill_title(page)
# 步骤7: 填充正文
logger.info("[7/11] 填充正文...")
await self.fill_content_text(page)
# 步骤8: 添加标签
logger.info("[8/11] 添加标签...")
await self.add_tags(page)
# 步骤9: 设置地点(可选)
if self.location:
logger.info("[9/11] 设置地点...")
await self.set_location(page, self.location)
else:
logger.info("[9/11] 跳过地点设置")
# 步骤10: 设置定时发布(可选)
if self.publish_date != 0:
logger.info("[10/11] 设置定时发布...")
await self.set_schedule_time(page, self.publish_date)
else:
logger.info("[10/11] 立即发布模式")
# 步骤11: 发布
logger.info("[11/11] 发布笔记...")
await self.pre_publish_check(page)
await self.random_pause(2, 5) # 模拟犹豫
await self.publish(page)
# 等待发布成功
await self.wait_publish_success(page)
# 保存Cookie
await context.storage_state(path=self.account_file)
logger.success("✅ Cookie已更新")
logger.success("🎉 视频笔记发布成功!")
except Exception as e:
logger.error(f"❌ 发布失败: {e}")
if context and page:
await page.screenshot(path=f"error_video_note_{int(time.time())}.png", full_page=True)
raise
finally:
if context:
await context.close()
if browser:
await browser.close()