1235 lines
54 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
from datetime import datetime
from playwright.async_api import Playwright, async_playwright, Page
import os
import asyncio
from conf import LOCAL_CHROME_PATH
from utils.base_social_media import set_init_script
from utils.log import xiaohongshu_logger
from utils.human_typing_wrapper import create_human_typer
async def cookie_auth(account_file):
async with async_playwright() as playwright:
browser = await playwright.chromium.launch(headless=True)
context = await browser.new_context(storage_state=account_file)
context = await set_init_script(context)
# 创建一个新的页面
page = await context.new_page()
# 访问指定的 URL
await page.goto("https://creator.xiaohongshu.com/creator-micro/content/upload")
try:
await page.wait_for_url("https://creator.xiaohongshu.com/creator-micro/content/upload", timeout=5000)
except:
print("[+] 等待5秒 cookie 失效")
await context.close()
await browser.close()
return False
# 2024.06.17 抖音创作者中心改版
if await page.get_by_text('手机号登录').count() or await page.get_by_text('扫码登录').count():
print("[+] 等待5秒 cookie 失效")
return False
else:
print("[+] cookie 有效")
return True
async def xiaohongshu_setup(account_file, handle=False):
if not os.path.exists(account_file) or not await cookie_auth(account_file):
if not handle:
# Todo alert message
return False
xiaohongshu_logger.info('[+] cookie文件不存在或已失效即将自动打开浏览器请扫码登录登陆后会自动生成cookie文件')
await xiaohongshu_cookie_gen(account_file)
return True
async def xiaohongshu_cookie_gen(account_file):
async with async_playwright() as playwright:
options = {
'headless': False
}
# Make sure to run headed.
browser = await playwright.chromium.launch(**options)
# Setup context however you like.
context = await browser.new_context() # Pass any options
context = await set_init_script(context)
# Pause the page, and start recording manually.
page = await context.new_page()
await page.goto("https://creator.xiaohongshu.com/")
await page.pause()
# 点击调试器的继续保存cookie
await context.storage_state(path=account_file)
class XiaoHongShuVideo(object):
def __init__(self, title, file_path, tags, publish_date: datetime, account_file, thumbnail_path=None, headless=True):
self.title = title # 视频标题
self.file_path = file_path
self.tags = tags
self.publish_date = publish_date
self.account_file = account_file
self.date_format = '%Y年%m月%d%H:%M'
self.local_executable_path = LOCAL_CHROME_PATH
self.thumbnail_path = thumbnail_path
self.headless = headless
async def set_schedule_time_xiaohongshu(self, page, publish_date):
print(" [-] 正在设置定时发布时间...")
print(f"publish_date: {publish_date}")
# 使用文本内容定位元素
# element = await page.wait_for_selector(
# 'label:has-text("定时发布")',
# timeout=5000 # 5秒超时时间
# )
# await element.click()
# # 选择包含特定文本内容的 label 元素
label_element = page.locator("label:has-text('定时发布')")
# # 在选中的 label 元素下点击 checkbox
await label_element.click()
await asyncio.sleep(1)
publish_date_hour = publish_date.strftime("%Y-%m-%d %H:%M")
print(f"publish_date_hour: {publish_date_hour}")
await asyncio.sleep(1)
await page.locator('.el-input__inner[placeholder="选择日期和时间"]').click()
await page.keyboard.press("Control+KeyA")
await page.keyboard.type(str(publish_date_hour))
await page.keyboard.press("Enter")
await asyncio.sleep(1)
async def handle_upload_error(self, page):
xiaohongshu_logger.info('视频出错了,重新上传中')
await page.locator('div.progress-div [class^="upload-btn-input"]').set_input_files(self.file_path)
async def upload(self, playwright: Playwright) -> None:
# 使用 Chromium 浏览器启动一个浏览器实例
if self.local_executable_path:
browser = await playwright.chromium.launch(headless=self.headless, executable_path=self.local_executable_path)
else:
browser = await playwright.chromium.launch(headless=self.headless)
# 创建一个浏览器上下文,使用指定的 cookie 文件
context = await browser.new_context(
viewport={"width": 1600, "height": 900},
storage_state=f"{self.account_file}"
)
context = await set_init_script(context)
# 创建一个新的页面
page = await context.new_page()
# 创建人类化输入包装器
human_typer = create_human_typer(page)
# 访问指定的 URL
await page.goto("https://creator.xiaohongshu.com/publish/publish?from=homepage&target=video")
xiaohongshu_logger.info(f'[+]正在上传-------{self.title}.mp4')
# 等待页面跳转到指定的 URL没进入则自动等待到超时
xiaohongshu_logger.info(f'[-] 正在打开主页...')
await page.wait_for_url("https://creator.xiaohongshu.com/publish/publish?from=homepage&target=video")
# 点击 "上传视频" 按钮
await page.locator("div[class^='upload-content'] input[class='upload-input']").set_input_files(self.file_path)
# 等待页面跳转到指定的 URL 2025.01.08修改在原有基础上兼容两种页面
while True:
try:
# 等待upload-input元素出现
upload_input = await page.wait_for_selector('input.upload-input', timeout=3000)
# 获取下一个兄弟元素
preview_new = await upload_input.query_selector(
'xpath=following-sibling::div[contains(@class, "preview-new")]')
if preview_new:
# 在preview-new元素中查找包含"上传成功"的stage元素
stage_elements = await preview_new.query_selector_all('div.stage')
upload_success = False
for stage in stage_elements:
text_content = await page.evaluate('(element) => element.textContent', stage)
if '上传成功' in text_content:
upload_success = True
break
if upload_success:
xiaohongshu_logger.info("[+] 检测到上传成功标识!")
break # 成功检测到上传成功后跳出循环
else:
print(" [-] 未找到上传成功标识,继续等待...")
else:
print(" [-] 未找到预览元素,继续等待...")
await asyncio.sleep(1)
except Exception as e:
print(f" [-] 检测过程出错: {str(e)},重新尝试...")
await asyncio.sleep(0.5) # 等待0.5秒后重新尝试
# 填充标题和话题
# 检查是否存在包含输入框的元素
# 这里为了避免页面变化故使用相对位置定位作品标题父级右侧第一个元素的input子元素
await asyncio.sleep(1)
xiaohongshu_logger.info(f' [-] 正在填充标题和话题...')
title_container = page.locator('div.plugin.title-container').locator('input.d-text')
if await title_container.count():
# 使用人类化输入填充标题
success = await human_typer.type_text_human(
'div.plugin.title-container input.d-text',
self.title[:30],
clear_first=True
)
if not success:
xiaohongshu_logger.warning("标题人类化输入失败,使用传统方式")
await title_container.fill(self.title[:30])
else:
# 使用人类化输入的备用方案
success = await human_typer.type_text_human(".notranslate", self.title, clear_first=True)
if not success:
xiaohongshu_logger.warning("标题人类化输入失败,使用传统方式")
titlecontainer = page.locator(".notranslate")
await titlecontainer.click()
await page.keyboard.press("Backspace")
await page.keyboard.press("Control+KeyA")
await page.keyboard.press("Delete")
await page.keyboard.type(self.title)
await page.keyboard.press("Enter")
css_selector = "#publish-container .editor-content > div > div" # 不能加上 .ql-blank 属性,这样只能获取第一次非空状态
# 创建专门用于慢速标签输入的人类化输入包装器
from utils.human_typing_wrapper import HumanTypingWrapper
slow_config = {
'min_delay': 500, # 最小延迟150ms更慢
'max_delay': 800, # 最大延迟300ms
'pause_probability': 0.3, # 30%概率暂停
'pause_min': 500, # 暂停最少500ms
'pause_max': 1200, # 暂停最多1200ms
'correction_probability': 0.0, # 禁用错误修正
'backspace_probability': 0.0, # 禁用退格重输
}
# 创建专门的慢速输入器
slow_typer = HumanTypingWrapper(page, slow_config)
# 逐个标签输入,每个标签后都有停顿
success = True
for i, tag in enumerate(self.tags):
tag_text = f"#{tag}"
# 输入标签文本(使用慢速配置)
tag_success = await slow_typer.type_text_human(
css_selector,
tag_text,
clear_first=False
)
if not tag_success:
success = False
break
# 输入换行符并添加停顿
await page.keyboard.press("Enter")
await page.wait_for_timeout(800) # 换行后停顿800ms
xiaohongshu_logger.info(f"已输入标签: {tag} ({i+1}/{len(self.tags)})")
if not success:
xiaohongshu_logger.warning("标签人类化输入失败,使用传统方式")
await page.click(css_selector)
for index, tag in enumerate(self.tags, start=1):
#await page.type(css_selector, "#" + tag , delay=100)
await page.keyboard.type("#")
for char in tag:
await page.keyboard.type(char,delay=500)
await page.wait_for_timeout(1000)
await page.keyboard.type("\r")
#await page.wait_for_timeout(1000)
#await page.press(css_selector, "Enter")
#await page.press(css_selector, "Space")
#await page.wait_for_timeout(5000)
xiaohongshu_logger.info(f'总共添加{len(self.tags)}个话题')
# 调试用暂停10秒让你有时间观察页面上的标签是否正确输入
# await page.wait_for_timeout(10000)
# xiaohongshu_logger.info("观察时间结束,中断程序")
# # 强制中断(后续代码不执行)
# raise SystemExit("调试中断:标签输入流程完成")
# while True:
# # 判断重新上传按钮是否存在,如果不存在,代表视频正在上传,则等待
# try:
# # 新版:定位重新上传
# number = await page.locator('[class^="long-card"] div:has-text("重新上传")').count()
# if number > 0:
# xiaohongshu_logger.success(" [-]视频上传完毕")
# break
# else:
# xiaohongshu_logger.info(" [-] 正在上传视频中...")
# await asyncio.sleep(2)
# if await page.locator('div.progress-div > div:has-text("上传失败")').count():
# xiaohongshu_logger.error(" [-] 发现上传出错了... 准备重试")
# await self.handle_upload_error(page)
# except:
# xiaohongshu_logger.info(" [-] 正在上传视频中...")
# await asyncio.sleep(2)
# 上传视频封面
# await self.set_thumbnail(page, self.thumbnail_path)
#更换可见元素
await self.set_location(page, "青岛市")
# # 頭條/西瓜
# third_part_element = '[class^="info"] > [class^="first-part"] div div.semi-switch'
# # 定位是否有第三方平台
# if await page.locator(third_part_element).count():
# # 检测是否是已选中状态
# if 'semi-switch-checked' not in await page.eval_on_selector(third_part_element, 'div => div.className'):
# await page.locator(third_part_element).locator('input.semi-switch-native-control').click()
if self.publish_date != 0:
await self.set_schedule_time_xiaohongshu(page, self.publish_date)
# 判断视频是否发布成功
while True:
try:
# 等待包含"定时发布"文本的button元素出现并点击
if self.publish_date != 0:
await page.locator('button:has-text("定时发布")').click()
else:
await page.locator('button:has-text("发布")').click()
await page.wait_for_url(
"https://creator.xiaohongshu.com/publish/success?**",
timeout=3000
) # 如果自动跳转到作品页面,则代表发布成功
xiaohongshu_logger.success(" [-]视频发布成功")
break
except:
xiaohongshu_logger.info(" [-] 视频正在发布中...")
await page.screenshot(full_page=True)
await asyncio.sleep(0.5)
await context.storage_state(path=self.account_file) # 保存cookie
xiaohongshu_logger.success(' [-]cookie更新完毕')
await asyncio.sleep(2) # 这里延迟是为了方便眼睛直观的观看
# 关闭浏览器上下文和浏览器实例
await context.close()
await browser.close()
async def set_thumbnail(self, page: Page, thumbnail_path: str):
if thumbnail_path:
await page.click('text="选择封面"')
await page.wait_for_selector("div.semi-modal-content:visible")
await page.click('text="设置竖封面"')
await page.wait_for_timeout(2000) # 等待2秒
# 定位到上传区域并点击
await page.locator("div[class^='semi-upload upload'] >> input.semi-upload-hidden-input").set_input_files(thumbnail_path)
await page.wait_for_timeout(2000) # 等待2秒
await page.locator("div[class^='extractFooter'] button:visible:has-text('完成')").click()
# finish_confirm_element = page.locator("div[class^='confirmBtn'] >> div:has-text('完成')")
# if await finish_confirm_element.count():
# await finish_confirm_element.click()
# await page.locator("div[class^='footer'] button:has-text('完成')").click()
async def set_location(self, page: Page, location: str = "青岛市"):
print(f"开始设置位置: {location}")
# 点击地点输入框
print("等待地点输入框加载...")
loc_ele = await page.wait_for_selector('div.d-text.d-select-placeholder.d-text-ellipsis.d-text-nowrap')
print(f"已定位到地点输入框: {loc_ele}")
await loc_ele.click()
print("点击地点输入框完成")
# 输入位置名称
print(f"等待1秒后输入位置名称: {location}")
await page.wait_for_timeout(1000)
await page.keyboard.type(location)
print(f"位置名称输入完成: {location}")
# 等待下拉列表加载
print("等待下拉列表加载...")
dropdown_selector = 'div.d-popover.d-popover-default.d-dropdown.--size-min-width-large'
await page.wait_for_timeout(3000)
try:
await page.wait_for_selector(dropdown_selector, timeout=3000)
print("下拉列表已加载")
except:
print("下拉列表未按预期显示,可能结构已变化")
# 增加等待时间以确保内容加载完成
print("额外等待1秒确保内容渲染完成...")
await page.wait_for_timeout(1000)
# 尝试更灵活的XPath选择器
print("尝试使用更灵活的XPath选择器...")
flexible_xpath = (
f'//div[contains(@class, "d-popover") and contains(@class, "d-dropdown")]'
f'//div[contains(@class, "d-options-wrapper")]'
f'//div[contains(@class, "d-grid") and contains(@class, "d-options")]'
f'//div[contains(@class, "name") and text()="{location}"]'
)
await page.wait_for_timeout(3000)
# 尝试定位元素
print(f"尝试定位包含'{location}'的选项...")
try:
# 先尝试使用更灵活的选择器
location_option = await page.wait_for_selector(
flexible_xpath,
timeout=3000
)
if location_option:
print(f"使用灵活选择器定位成功: {location_option}")
else:
# 如果灵活选择器失败,再尝试原选择器
print("灵活选择器未找到元素,尝试原始选择器...")
location_option = await page.wait_for_selector(
f'//div[contains(@class, "d-popover") and contains(@class, "d-dropdown")]'
f'//div[contains(@class, "d-options-wrapper")]'
f'//div[contains(@class, "d-grid") and contains(@class, "d-options")]'
f'/div[1]//div[contains(@class, "name") and text()="{location}"]',
timeout=2000
)
# 滚动到元素并点击
print("滚动到目标选项...")
await location_option.scroll_into_view_if_needed()
print("元素已滚动到视图内")
# 增加元素可见性检查
is_visible = await location_option.is_visible()
print(f"目标选项是否可见: {is_visible}")
# 点击元素
print("准备点击目标选项...")
await location_option.click()
print(f"成功选择位置: {location}")
return True
except Exception as e:
print(f"定位位置失败: {e}")
# 打印更多调试信息
print("尝试获取下拉列表中的所有选项...")
try:
all_options = await page.query_selector_all(
'//div[contains(@class, "d-popover") and contains(@class, "d-dropdown")]'
'//div[contains(@class, "d-options-wrapper")]'
'//div[contains(@class, "d-grid") and contains(@class, "d-options")]'
'/div'
)
print(f"找到 {len(all_options)} 个选项")
# 打印前3个选项的文本内容
for i, option in enumerate(all_options[:3]):
option_text = await option.inner_text()
print(f"选项 {i+1}: {option_text.strip()[:50]}...")
except Exception as e:
print(f"获取选项列表失败: {e}")
# 截图保存(取消注释使用)
# await page.screenshot(path=f"location_error_{location}.png")
return False
async def main(self):
async with async_playwright() as playwright:
await self.upload(playwright)
class XiaoHongShuImage(object):
def __init__(self, title, image_paths, tags, publish_date: datetime, account_file, location=None, content=None, headless=True, use_enhanced_typing=True):
self.title = title # 图文标题
self.image_paths = image_paths if isinstance(image_paths, list) else [image_paths] # 支持单张或多张图片
self.tags = tags
self.publish_date = publish_date
self.account_file = account_file
self.location = location # 地点信息,可以从文本文件导入
self.content = content # 正文内容,可以从文本文件导入
self.date_format = '%Y年%m月%d%H:%M'
self.local_executable_path = LOCAL_CHROME_PATH
self.headless = headless
self.use_enhanced_typing = use_enhanced_typing # 是否使用增强版输入
async def set_schedule_time_xiaohongshu(self, page, publish_date):
"""设置定时发布时间"""
xiaohongshu_logger.info(" [-] 正在设置定时发布时间...")
try:
# 定位并点击定时发布复选框
schedule_checkbox = await page.wait_for_selector('input[type="checkbox"]', timeout=3000)
await schedule_checkbox.click()
await asyncio.sleep(random.uniform(0.5, 1.0))
# 定位并点击时间输入框
date_input = await page.wait_for_selector('input[placeholder="选择日期和时间"]', timeout=3000)
await date_input.click()
await asyncio.sleep(random.uniform(0.3, 0.5))
# 输入发布时间
publish_date_str = publish_date.strftime("%Y-%m-%d %H:%M")
await date_input.fill(publish_date_str)
await asyncio.sleep(random.uniform(0.3, 0.5))
await page.keyboard.press("Enter")
xiaohongshu_logger.success(f" [-] 定时发布时间设置完成: {publish_date_str}")
return True
except Exception as e:
xiaohongshu_logger.error(f" [-] 设置定时发布时间失败: {e}")
return False
async def upload_images(self, page):
"""上传图片"""
from pathlib import Path
xiaohongshu_logger.info(f'[+] 正在上传图片,共{len(self.image_paths)}')
# 等待页面加载
await asyncio.sleep(2)
try:
# 直接定位上传输入框
upload_input = await page.wait_for_selector("input[type='file']", timeout=5000)
if not upload_input:
raise Exception("未找到图片上传元素")
# 上传图片
file_names = [Path(p).name for p in self.image_paths]
xiaohongshu_logger.info(f" [-] 上传文件: {', '.join(file_names)}")
await upload_input.set_input_files(self.image_paths)
# 等待上传完成
await asyncio.sleep(2)
await self.wait_for_images_upload_complete(page)
except Exception as e:
xiaohongshu_logger.error(f"图片上传失败: {e}")
raise
async def wait_for_images_upload_complete(self, page):
"""等待图片上传完成"""
max_wait_time = 60
wait_count = 0
while wait_count < max_wait_time:
try:
# 检查图片预览
images = await page.query_selector_all('img')
valid_images = [img for img in images if await img.get_attribute('src')]
if len(valid_images) >= len(self.image_paths):
xiaohongshu_logger.success(f" [-] 图片上传完成 ({len(valid_images)}张)")
# 随机等待一小段时间确保图片完全加载
await asyncio.sleep(random.uniform(1.5, 2.5))
return
# 检查是否还在上传
loading = await page.query_selector('[class*="loading"], [class*="uploading"]')
if not loading:
# 再次检查图片数量
images = await page.query_selector_all('img')
if len(images) >= len(self.image_paths):
xiaohongshu_logger.success(" [-] 图片上传完成")
await asyncio.sleep(random.uniform(1.0, 2.0))
return
# 每10秒输出一次进度
if wait_count % 10 == 0:
xiaohongshu_logger.info(f" [-] 等待图片上传... ({wait_count}/{max_wait_time}秒)")
await asyncio.sleep(random.uniform(2.0, 3.0))
wait_count += 2
except Exception as e:
xiaohongshu_logger.error(f" [-] 检查上传状态出错: {e}")
await asyncio.sleep(random.uniform(1.0, 2.0))
wait_count += 2
raise Exception("图片上传超时")
async def locate_content_editor(self, page):
"""定位正文编辑区域"""
# 方法1基于class的精确定位
primary_selector = "div.editor-content"
# 方法2基于属性的备用定位
backup_selector = "div[contenteditable='true'][role='textbox']"
xiaohongshu_logger.info(" [-] 查找正文输入区域...")
# 尝试主选择器
try:
element = await page.wait_for_selector(primary_selector, timeout=3000)
xiaohongshu_logger.info(f" [-] 使用主选择器成功定位: {primary_selector}")
return element, primary_selector
except:
xiaohongshu_logger.warning(" [-] 主选择器定位失败,尝试备用选择器...")
# 尝试备用选择器
try:
element = await page.wait_for_selector(backup_selector, timeout=3000)
xiaohongshu_logger.info(f" [-] 使用备用选择器成功定位: {backup_selector}")
return element, backup_selector
except:
xiaohongshu_logger.error(" [-] 所有选择器都无法定位正文区域")
raise Exception("无法找到正文输入区域")
async def fill_content(self, page, human_typer):
"""填充标题和内容"""
xiaohongshu_logger.info(f' [-] 正在填充标题和话题...')
# 等待页面加载
await asyncio.sleep(2)
# 填充标题
title_container = page.locator('div.plugin.title-container').locator('input.d-text')
if await title_container.count():
# 使用人类化输入填充标题
await title_container.click()
await asyncio.sleep(0.5)
await page.keyboard.press("Control+A")
await page.keyboard.press("Delete")
await asyncio.sleep(0.3)
# 使用视频上传中的标题输入方式
for char in self.title[:30]:
await page.keyboard.type(char, delay=random.randint(100, 200))
await asyncio.sleep(random.uniform(0.05, 0.15))
await asyncio.sleep(0.5)
else:
# 使用备用方案
titlecontainer = page.locator(".notranslate")
await titlecontainer.click()
await asyncio.sleep(0.5)
await page.keyboard.press("Control+A")
await page.keyboard.press("Delete")
await asyncio.sleep(0.3)
for char in self.title:
await page.keyboard.type(char, delay=random.randint(100, 200))
await asyncio.sleep(random.uniform(0.05, 0.15))
await asyncio.sleep(0.5)
# 定位正文编辑区域
content_element, css_selector = await self.locate_content_editor(page)
# 准备正文内容
if self.content:
content_text = self.content
xiaohongshu_logger.info(f" [-] 使用自定义正文内容,长度: {len(content_text)} 字符")
else:
content_text = f"{self.title}\n\n"
xiaohongshu_logger.info(" [-] 使用默认正文内容(标题)")
try:
# 使用增强版人类输入模拟器
from utils.enhanced_human_typing import EnhancedHumanTypingSimulator
human_typer = EnhancedHumanTypingSimulator(page)
# 输入正文内容
success = await human_typer.type_text(content_text, css_selector)
if not success:
xiaohongshu_logger.error(" [-] 增强版输入失败,尝试使用备用方案")
# 点击并清空输入区域
await content_element.click()
await asyncio.sleep(random.uniform(0.3, 0.5))
await page.keyboard.press("Control+A")
await page.keyboard.press("Delete")
await asyncio.sleep(random.uniform(0.2, 0.4))
# 使用简单的输入方式
for char in content_text:
await page.keyboard.type(char, delay=random.randint(100, 200))
await asyncio.sleep(random.uniform(0.05, 0.1))
xiaohongshu_logger.success(f" [-] 正文输入完成,共 {len(content_text)} 字符")
except Exception as e:
xiaohongshu_logger.error(f" [-] 正文输入失败: {e}")
raise
xiaohongshu_logger.success(f" [-] 正文输入完成,共 {len(content_text)} 字符")
# 在正文后面添加标签
xiaohongshu_logger.info(" [-] 开始在正文后面添加标签...")
# 确保光标在正文的最后位置
await content_element.click()
await asyncio.sleep(0.3)
# 移动光标到文本末尾
await page.keyboard.press("End")
await page.keyboard.press("Control+End") # 确保到达最末尾
await asyncio.sleep(0.3)
# 添加两个换行,将标签与正文分开
await page.keyboard.press("Enter")
await page.keyboard.press("Enter")
await asyncio.sleep(0.5)
# 输入标签
xiaohongshu_logger.info(f" [-] 开始输入标签 ({len(self.tags)}个)...")
# 创建专门用于慢速标签输入的人类化输入包装器
from utils.human_typing_wrapper import HumanTypingWrapper
slow_config = {
'min_delay': 500, # 最小延迟150ms更慢
'max_delay': 800, # 最大延迟300ms
'pause_probability': 0.3, # 30%概率暂停
'pause_min': 500, # 暂停最少500ms
'pause_max': 1200, # 暂停最多1200ms
'correction_probability': 0.0, # 禁用错误修正
'backspace_probability': 0.0, # 禁用退格重输
}
# 创建专门的慢速输入器
slow_typer = HumanTypingWrapper(page, slow_config)
# 逐个标签输入,每个标签后都有停顿
success = True
for i, tag in enumerate(self.tags):
tag_text = f"#{tag}"
# 输入标签文本(使用慢速配置)
# 先输入#号需要按Shift+3
await page.keyboard.press("Shift")
await asyncio.sleep(random.uniform(0.1, 0.2))
await page.keyboard.press("Digit3")
await page.keyboard.up("Shift")
await asyncio.sleep(random.uniform(0.2, 0.4))
# 输入标签内容
tag_success = await slow_typer.type_text_human(
css_selector,
tag,
clear_first=False
)
if not tag_success:
success = False
break
# 输入换行符并添加停顿
await page.keyboard.press("Enter")
await page.wait_for_timeout(800) # 换行后停顿800ms
xiaohongshu_logger.info(f"已输入标签: {tag} ({i+1}/{len(self.tags)})")
if not success:
xiaohongshu_logger.warning("标签人类化输入失败,使用传统方式")
await page.click(css_selector)
for index, tag in enumerate(self.tags, start=1):
# 输入#号需要按Shift+3
await page.keyboard.press("Shift")
await asyncio.sleep(random.uniform(0.1, 0.2))
await page.keyboard.press("Digit3")
await page.keyboard.up("Shift")
await asyncio.sleep(random.uniform(0.2, 0.4))
for char in tag:
await page.keyboard.type(char, delay=500)
await page.wait_for_timeout(1000)
await page.keyboard.press("Enter")
xiaohongshu_logger.success(f' [-] 标签输入完成 ({len(self.tags)}个)')
async def _handle_tag_suggestions_after_input(self, page: Page, tag: str) -> None:
"""标签输入后处理建议选择"""
try:
import random
await asyncio.sleep(random.uniform(0.5, 1.0))
suggestion_found = await self._handle_tag_suggestions(page, tag)
if not suggestion_found:
await asyncio.sleep(random.uniform(0.2, 0.5))
await page.keyboard.press("Enter")
except Exception as e:
xiaohongshu_logger.debug(f" [-] 标签建议处理出错: {e}")
async def _fallback_tag_input(self, page: Page, css_selector: str) -> None:
"""备用标签输入方法"""
try:
import random
await page.click(css_selector)
await asyncio.sleep(0.5)
for index, tag in enumerate(self.tags, start=1):
# 输入标签(移除详细日志)
await page.keyboard.type("#")
await asyncio.sleep(random.uniform(0.1, 0.3))
for char in tag:
await page.keyboard.type(char, delay=random.randint(300, 600))
await asyncio.sleep(random.uniform(0.8, 1.2))
# 标签间分隔
if index < len(self.tags):
await page.keyboard.type(" ")
await asyncio.sleep(random.uniform(0.3, 0.6))
except Exception as e:
xiaohongshu_logger.error(f" [-] 备用标签输入失败: {e}")
async def _input_long_content_in_segments(self, page: Page, content_typer, css_selector: str, content_text: str) -> bool:
"""
分段输入长文本,模拟真实的写作过程
Args:
page: Playwright页面对象
content_typer: 人类化输入包装器
css_selector: 内容区域选择器
content_text: 要输入的文本内容
Returns:
bool: 是否输入成功
"""
try:
import random
# 按段落分割文本(简化日志)
paragraphs = content_text.split('\n\n')
# 清空输入区域
await content_typer.clear_element(css_selector)
await asyncio.sleep(0.5)
for i, paragraph in enumerate(paragraphs, 1):
if not paragraph.strip():
continue
# 输入段落内容(移除详细日志)
success = await content_typer.type_text_human(
css_selector, paragraph, clear_first=False
)
if not success:
xiaohongshu_logger.warning(f" [-] 第 {i} 段落输入失败")
return False
# 段落间添加换行和思考时间
if i < len(paragraphs):
# 先停顿一下,模拟思考下一段内容
await asyncio.sleep(random.uniform(2.0, 4.0))
# 输入第一个换行,短暂停顿
await page.keyboard.press("Enter")
await asyncio.sleep(random.uniform(0.3, 0.6))
# 输入第二个换行,再次短暂停顿
await page.keyboard.press("Enter")
await asyncio.sleep(random.uniform(0.5, 1.0))
# 段落间再次思考
await asyncio.sleep(random.uniform(1.5, 3.0))
xiaohongshu_logger.success(" [-] 分段输入完成")
return True
except Exception as e:
xiaohongshu_logger.error(f" [-] 分段输入失败: {e}")
return False
async def _fallback_human_typing(self, page: Page, content_text: str) -> None:
"""
备用的人类化输入方法
Args:
page: Playwright页面对象
content_text: 要输入的文本内容
"""
import random
char_count = 0
for char in content_text:
await page.keyboard.type(char)
char_count += 1
# 随机延迟,模拟人类打字
delay = random.randint(50, 120) # 50-120ms随机延迟
await asyncio.sleep(delay / 1000)
# 偶尔暂停,模拟思考(移除详细日志)
if random.random() < 0.05: # 5%概率暂停
pause_time = random.randint(300, 800)
await asyncio.sleep(pause_time / 1000)
# 减少进度日志频率每100个字符显示一次
if char_count % 100 == 0:
xiaohongshu_logger.debug(f" [-] 输入进度: {char_count}/{len(content_text)}")
async def _input_single_tag(self, page: Page, tag: str, current: int, total: int) -> None:
"""
输入单个标签并智能处理建议选择(增强人类化行为)
Args:
page: Playwright页面对象
tag: 标签内容
current: 当前标签序号
total: 总标签数量
"""
import random
tag_text = f"#{tag}"
xiaohongshu_logger.info(f" [-] 输入标签: {tag_text} ({current}/{total})")
try:
# 🔧 1. 标签输入前的思考暂停(模拟用户思考下一个标签)
if current > 1: # 第一个标签不需要思考时间
think_time = random.uniform(0.5, 2.0) # 0.5-2秒思考时间
xiaohongshu_logger.debug(f" [-] 思考下一个标签... ({think_time:.1f}秒)")
await asyncio.sleep(think_time)
# 🔧 2. 更人类化的逐字符输入
await self._human_like_tag_typing(page, tag_text)
# 🔧 3. 输入完成后的短暂停顿(模拟用户检查输入)
check_pause = random.uniform(0.3, 0.8)
await asyncio.sleep(check_pause)
# 🔧 4. 等待并处理标签建议(随机等待时间)
suggestion_wait = random.uniform(0.6, 1.2) # 0.6-1.2秒随机等待
xiaohongshu_logger.debug(f" [-] 等待标签建议... ({suggestion_wait:.1f}秒)")
await asyncio.sleep(suggestion_wait)
# 5. 查找标签建议
suggestion_found = await self._handle_tag_suggestions(page, tag)
if suggestion_found:
xiaohongshu_logger.info(f" [-] 选择了匹配的标签建议: {tag}")
else:
# 🔧 没有匹配建议时的犹豫行为
hesitate_time = random.uniform(0.2, 0.6) # 犹豫0.2-0.6秒
xiaohongshu_logger.debug(f" [-] 未找到建议,犹豫中... ({hesitate_time:.1f}秒)")
await asyncio.sleep(hesitate_time)
xiaohongshu_logger.info(f" [-] 未找到匹配建议,生成新标签: {tag}")
await page.keyboard.press("Enter")
await asyncio.sleep(random.uniform(0.2, 0.5)) # 随机确认时间
# 🔧 6. 标签间的自然间隔
if current < total:
# 模拟用户在标签间的自然停顿
inter_tag_pause = random.uniform(0.3, 0.8)
await asyncio.sleep(inter_tag_pause)
await page.keyboard.type(" ")
# 空格后的微小停顿
space_pause = random.uniform(0.1, 0.3)
await asyncio.sleep(space_pause)
xiaohongshu_logger.info(f" [-] 标签处理完成: {tag} ({current}/{total})")
except Exception as e:
xiaohongshu_logger.error(f" [-] 输入标签 {tag} 时出错: {e}")
# 出错时也要模拟人类的反应时间
await asyncio.sleep(random.uniform(0.2, 0.5))
await page.keyboard.press("Enter")
await asyncio.sleep(random.uniform(0.3, 0.6))
async def _human_like_tag_typing(self, page: Page, tag_text: str) -> None:
"""
更人类化的标签输入方法
Args:
page: Playwright页面对象
tag_text: 要输入的标签文本
"""
import random
# 模拟不同的打字节奏
for i, char in enumerate(tag_text):
# 🔧 更宽泛的延迟范围,模拟真实打字速度变化
if char == '#':
# 输入#号时稍慢一些用户需要按shift+3
delay = random.randint(120, 250)
elif char.isalpha():
# 字母输入相对较快
delay = random.randint(60, 180)
else:
# 其他字符(数字、符号)稍慢
delay = random.randint(80, 200)
await page.keyboard.type(char)
await asyncio.sleep(delay / 1000)
# 🔧 模拟偶尔的打字错误和修正2%概率)
if random.random() < 0.02 and i < len(tag_text) - 1:
# 打错一个字符
wrong_char = random.choice('abcdefghijklmnopqrstuvwxyz')
await page.keyboard.type(wrong_char)
await asyncio.sleep(random.randint(100, 300) / 1000)
# 发现错误,退格删除
await asyncio.sleep(random.uniform(0.2, 0.5)) # 发现错误的反应时间
await page.keyboard.press("Backspace")
await asyncio.sleep(random.randint(80, 150) / 1000)
xiaohongshu_logger.debug(f" [-] 模拟打字错误并修正")
# 🔧 模拟偶尔的思考停顿5%概率)
if random.random() < 0.05 and i < len(tag_text) - 1:
pause_time = random.uniform(0.3, 0.8)
xiaohongshu_logger.debug(f" [-] 输入中思考停顿 ({pause_time:.1f}秒)")
await asyncio.sleep(pause_time)
async def _handle_tag_suggestions(self, page: Page, tag: str) -> bool:
"""
处理标签建议选择
Args:
page: Playwright页面对象
tag: 标签内容
Returns:
bool: 是否找到并选择了匹配的建议
"""
try:
xiaohongshu_logger.info(f" [-] 查找标签 '{tag}' 的建议...")
# 查找标签建议容器的多种可能选择器
suggestion_selectors = [
'div[class*="suggestion"]',
'div[class*="dropdown"]',
'div[class*="popover"]',
'ul[class*="options"]',
'div[class*="tag-suggestion"]',
'[role="listbox"]',
'[role="menu"]'
]
suggestion_container = None
for selector in suggestion_selectors:
try:
suggestion_container = await page.wait_for_selector(selector, timeout=1000)
if suggestion_container:
xiaohongshu_logger.debug(f" 找到建议容器: {selector}")
break
except:
continue
if not suggestion_container:
xiaohongshu_logger.debug(" [-] 未找到标签建议容器")
return False
# 查找匹配的标签建议
suggestion_items = await page.query_selector_all(
f'{suggestion_selectors[0]} div, '
f'{suggestion_selectors[0]} li, '
f'{suggestion_selectors[0]} span'
)
xiaohongshu_logger.debug(f" [-] 找到 {len(suggestion_items)} 个建议项")
# 寻找最佳匹配
best_match = None
exact_match = None
for item in suggestion_items:
try:
item_text = await item.inner_text()
if not item_text:
continue
# 清理文本(移除#号和额外空格)
clean_text = item_text.strip()
if clean_text.startswith('#'):
clean_text = clean_text[1:].strip()
xiaohongshu_logger.debug(f" 建议项: {clean_text}")
# 精确匹配
if clean_text == tag:
exact_match = item
xiaohongshu_logger.info(f" [-] 找到精确匹配: {clean_text}")
break
# 包含匹配(作为备选)
if tag in clean_text or clean_text in tag:
if not best_match:
best_match = item
xiaohongshu_logger.debug(f" 备选匹配: {clean_text}")
except Exception as e:
xiaohongshu_logger.debug(f" 处理建议项时出错: {e}")
continue
# 选择最佳匹配
selected_item = exact_match or best_match
if selected_item:
try:
# 🔧 模拟用户查看和选择建议的过程
import random
# 查看建议的时间(用户需要读取和比较)
review_time = random.uniform(0.4, 1.0)
xiaohongshu_logger.debug(f" [-] 查看标签建议... ({review_time:.1f}秒)")
await asyncio.sleep(review_time)
# 偶尔犹豫一下是否选择这个建议10%概率)
if random.random() < 0.1:
hesitate_time = random.uniform(0.3, 0.7)
xiaohongshu_logger.debug(f" [-] 犹豫是否选择建议... ({hesitate_time:.1f}秒)")
await asyncio.sleep(hesitate_time)
await selected_item.click()
# 点击后的确认时间
confirm_time = random.uniform(0.2, 0.5)
await asyncio.sleep(confirm_time)
xiaohongshu_logger.info(f" [-] 成功选择标签建议")
return True
except Exception as e:
xiaohongshu_logger.warning(f" [-] 点击标签建议失败: {e}")
return False
else:
xiaohongshu_logger.debug(f" [-] 未找到匹配的标签建议")
return False
except Exception as e:
xiaohongshu_logger.debug(f" [-] 处理标签建议时出错: {e}")
return False
async def set_location(self, page: Page, location: str) -> bool:
"""设置地理位置信息"""
xiaohongshu_logger.info(f" [-] 开始设置地理位置: {location}")
try:
# 定位并点击地点选择框
location_input = await page.wait_for_selector('div[class*="d-select"]', timeout=3000)
await location_input.click()
await asyncio.sleep(random.uniform(0.3, 0.5))
# 输入地点名称
await page.keyboard.type(location)
await asyncio.sleep(random.uniform(1.0, 1.5)) # 等待下拉列表加载
# 选择第一个匹配的选项
option = await page.wait_for_selector(f'div[class*="name"]:has-text("{location}")', timeout=3000)
if option:
await option.click()
xiaohongshu_logger.success(f" [-] 成功选择地点: {location}")
return True
xiaohongshu_logger.warning(f" [-] 未找到匹配的地点选项: {location}")
return False
except Exception as e:
xiaohongshu_logger.error(f" [-] 设置地理位置失败: {e}")
return False
async def upload(self, playwright: Playwright) -> None:
"""主要的上传流程"""
# 使用 Chromium 浏览器启动一个浏览器实例
if self.local_executable_path:
browser = await playwright.chromium.launch(
headless=self.headless,
executable_path=self.local_executable_path
)
else:
browser = await playwright.chromium.launch(
headless=self.headless
)
# 创建一个浏览器上下文,使用基本配置
context = await browser.new_context(
viewport={"width": 1600, "height": 900},
storage_state=f"{self.account_file}"
)
context = await set_init_script(context)
# 创建一个新的页面
page = await context.new_page()
# 🔧 创建人类化输入包装器(关键修复)
if self.use_enhanced_typing:
from utils.enhanced_human_typing import EnhancedHumanTypingSimulator
human_typer = EnhancedHumanTypingSimulator(page)
xiaohongshu_logger.info(" [-] 已创建增强版人类化输入模拟器")
else:
human_typer = create_human_typer(page)
xiaohongshu_logger.info(" [-] 已创建标准人类化输入包装器")
try:
# 直接访问小红书图文发布页面
await page.goto("https://creator.xiaohongshu.com/publish/publish?from=tab_switch&target=image")
xiaohongshu_logger.info(f'[+]正在上传图文-------{self.title}')
# 等待页面加载
xiaohongshu_logger.info(f'[-] 正在打开图文发布页面...')
await page.wait_for_url("https://creator.xiaohongshu.com/publish/publish*")
await asyncio.sleep(2) # 等待页面完全加载
# 上传图片
await self.upload_images(page)
# 等待页面稳定
await asyncio.sleep(3) # 增加等待时间,确保页面稳定
# 填充内容(传递人类化输入包装器)
await self.fill_content(page, human_typer)
# 设置位置(如果有指定地点)
if self.location and self.location.strip():
xiaohongshu_logger.info(f" [-] 开始设置地理位置: {self.location}")
await self.set_location(page, self.location)
else:
xiaohongshu_logger.info(" [-] 未指定地点或地点为空,跳过位置设置")
# 设置定时发布(如果需要)
if self.publish_date != 0:
await self.set_schedule_time_xiaohongshu(page, self.publish_date)
except Exception as e:
xiaohongshu_logger.error(f"页面操作出错: {e}")
# 保存页面截图以便调试
await page.screenshot(path="error_screenshot.png", full_page=True)
raise
# 发布图文
xiaohongshu_logger.info(" [-] 准备发布图文...")
await asyncio.sleep(random.uniform(0.5, 1.0)) # 发布前等待
try:
# 定位并点击发布按钮
button_text = "定时发布" if self.publish_date != 0 else "发布"
publish_button = await page.wait_for_selector(f'button:has-text("{button_text}")', timeout=3000)
if not publish_button:
raise Exception(f"未找到{button_text}按钮")
# 点击发布按钮
await publish_button.click()
await asyncio.sleep(random.uniform(0.5, 1.0))
# 等待发布成功
success_url = "https://creator.xiaohongshu.com/publish/success"
await page.wait_for_url(f"{success_url}?**", timeout=5000)
xiaohongshu_logger.success(" [-] 图文发布成功")
except Exception as e:
xiaohongshu_logger.error(f" [-] 发布失败: {e}")
# 保存错误截图
await page.screenshot(path="publish_error.png", full_page=True)
raise
# 保存cookie并关闭浏览器
await context.storage_state(path=self.account_file)
xiaohongshu_logger.success(' [-]cookie更新完毕')
await asyncio.sleep(2)
await context.close()
await browser.close()
async def main(self):
async with async_playwright() as playwright:
await self.upload(playwright)