xiaohongshu_note_uploader/example_batch.py

161 lines
4.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
批量发布示例
这个示例展示如何批量发布多篇笔记
"""
import asyncio
from datetime import datetime, timedelta
from publisher import XiaoHongShuImageNote
async def publish_single_note(note_data, account_file):
"""发布单篇笔记"""
try:
note = XiaoHongShuImageNote(
title=note_data["title"],
content=note_data["content"],
tags=note_data["tags"],
image_paths=note_data["images"],
publish_date=note_data.get("publish_date", 0),
account_file=account_file,
location=note_data.get("location"),
headless=True # 批量发布建议使用无头模式
)
await note.main()
print(f"✅ [{note_data['title']}] 发布成功")
return True
except Exception as e:
print(f"❌ [{note_data['title']}] 发布失败: {e}")
return False
async def main():
"""批量发布主函数"""
print("=" * 70)
print("📦 小红书笔记批量发布示例")
print("=" * 70)
# Cookie文件路径
account_file = "cookies/account.json"
# 准备要发布的笔记列表
notes = [
{
"title": "美食分享第一弹 🍔",
"content": "今天发现的宝藏餐厅,必须分享给大家!",
"tags": ["#美食", "#探店", "#广州"],
"images": ["../videos/image.jpg"], # 替换为实际图片
"location": "天河区"
},
{
"title": "周末出游记录 ✨",
"content": "周末去了超美的公园,拍了好多照片!",
"tags": ["#旅行", "#周末", "#打卡"],
"images": ["../videos/image.jpg"], # 替换为实际图片
},
{
"title": "日常穿搭分享 👗",
"content": "最近很喜欢的一套搭配~",
"tags": ["#穿搭", "#OOTD", "#日常"],
"images": ["../videos/image.jpg"], # 替换为实际图片
},
]
print(f"\n📋 准备发布 {len(notes)} 篇笔记")
print("💡 每篇笔记之间会自动间隔30秒避免触发风控\n")
# 统计结果
success_count = 0
failed_count = 0
# 逐个发布
for i, note_data in enumerate(notes, 1):
print(f"\n{'='*70}")
print(f"正在发布第 {i}/{len(notes)} 篇: {note_data['title']}")
print(f"{'='*70}")
result = await publish_single_note(note_data, account_file)
if result:
success_count += 1
else:
failed_count += 1
# 如果不是最后一篇,等待一段时间
if i < len(notes):
wait_time = 30 # 间隔30秒
print(f"\n⏰ 等待 {wait_time} 秒后发布下一篇...")
await asyncio.sleep(wait_time)
# 输出汇总
print("\n" + "=" * 70)
print("📊 批量发布完成!")
print("=" * 70)
print(f"✅ 成功: {success_count}")
print(f"❌ 失败: {failed_count}")
print(f"📱 总计: {len(notes)}")
if success_count > 0:
print("\n🎉 可以打开小红书App查看你的笔记了")
async def batch_with_schedule():
"""批量定时发布示例"""
print("=" * 70)
print("⏰ 批量定时发布示例")
print("=" * 70)
account_file = "cookies/account.json"
# 设置发布时间今天下午3点、4点、5点
today = datetime.now()
publish_times = [
today.replace(hour=15, minute=0, second=0),
today.replace(hour=16, minute=0, second=0),
today.replace(hour=17, minute=0, second=0),
]
notes = [
{
"title": f"定时笔记 {i+1}",
"content": f"这是第{i+1}篇定时发布的笔记",
"tags": ["#定时发布"],
"images": ["../videos/image.jpg"],
"publish_date": publish_times[i]
}
for i in range(3)
]
print(f"\n📋 准备设置 {len(notes)} 篇定时笔记")
for i, note_data in enumerate(notes, 1):
print(f"\n正在设置第 {i} 篇...")
print(f" 标题: {note_data['title']}")
print(f" 发布时间: {note_data['publish_date'].strftime('%Y-%m-%d %H:%M')}")
await publish_single_note(note_data, account_file)
if i < len(notes):
await asyncio.sleep(30)
print("\n✅ 所有定时笔记已设置完成!")
if __name__ == "__main__":
print("请选择运行模式:")
print("1. 批量立即发布")
print("2. 批量定时发布")
choice = input("\n请输入选项 (1/2): ").strip()
if choice == "1":
asyncio.run(main())
elif choice == "2":
asyncio.run(batch_with_schedule())
else:
print("❌ 无效选项")