xiaohongshu_note_uploader/example_batch.py

161 lines
4.8 KiB
Python
Raw Permalink Normal View History

2025-11-07 15:04:47 +08:00
"""
批量发布示例
这个示例展示如何批量发布多篇笔记
"""
import asyncio
from datetime import datetime, timedelta
from publisher import XiaoHongShuImageNote
async def publish_single_note(note_data, account_file):
"""发布单篇笔记"""
try:
note = XiaoHongShuImageNote(
title=note_data["title"],
content=note_data["content"],
tags=note_data["tags"],
image_paths=note_data["images"],
publish_date=note_data.get("publish_date", 0),
account_file=account_file,
location=note_data.get("location"),
headless=True # 批量发布建议使用无头模式
)
await note.main()
print(f"✅ [{note_data['title']}] 发布成功")
return True
except Exception as e:
print(f"❌ [{note_data['title']}] 发布失败: {e}")
return False
async def main():
"""批量发布主函数"""
print("=" * 70)
print("📦 小红书笔记批量发布示例")
print("=" * 70)
# Cookie文件路径
account_file = "cookies/account.json"
# 准备要发布的笔记列表
notes = [
{
"title": "美食分享第一弹 🍔",
"content": "今天发现的宝藏餐厅,必须分享给大家!",
"tags": ["#美食", "#探店", "#广州"],
"images": ["../videos/image.jpg"], # 替换为实际图片
"location": "天河区"
},
{
"title": "周末出游记录 ✨",
"content": "周末去了超美的公园,拍了好多照片!",
"tags": ["#旅行", "#周末", "#打卡"],
"images": ["../videos/image.jpg"], # 替换为实际图片
},
{
"title": "日常穿搭分享 👗",
"content": "最近很喜欢的一套搭配~",
"tags": ["#穿搭", "#OOTD", "#日常"],
"images": ["../videos/image.jpg"], # 替换为实际图片
},
]
print(f"\n📋 准备发布 {len(notes)} 篇笔记")
print("💡 每篇笔记之间会自动间隔30秒避免触发风控\n")
# 统计结果
success_count = 0
failed_count = 0
# 逐个发布
for i, note_data in enumerate(notes, 1):
print(f"\n{'='*70}")
print(f"正在发布第 {i}/{len(notes)} 篇: {note_data['title']}")
print(f"{'='*70}")
result = await publish_single_note(note_data, account_file)
if result:
success_count += 1
else:
failed_count += 1
# 如果不是最后一篇,等待一段时间
if i < len(notes):
wait_time = 30 # 间隔30秒
print(f"\n⏰ 等待 {wait_time} 秒后发布下一篇...")
await asyncio.sleep(wait_time)
# 输出汇总
print("\n" + "=" * 70)
print("📊 批量发布完成!")
print("=" * 70)
print(f"✅ 成功: {success_count}")
print(f"❌ 失败: {failed_count}")
print(f"📱 总计: {len(notes)}")
if success_count > 0:
print("\n🎉 可以打开小红书App查看你的笔记了")
async def batch_with_schedule():
"""批量定时发布示例"""
print("=" * 70)
print("⏰ 批量定时发布示例")
print("=" * 70)
account_file = "cookies/account.json"
# 设置发布时间今天下午3点、4点、5点
today = datetime.now()
publish_times = [
today.replace(hour=15, minute=0, second=0),
today.replace(hour=16, minute=0, second=0),
today.replace(hour=17, minute=0, second=0),
]
notes = [
{
"title": f"定时笔记 {i+1}",
"content": f"这是第{i+1}篇定时发布的笔记",
"tags": ["#定时发布"],
"images": ["../videos/image.jpg"],
"publish_date": publish_times[i]
}
for i in range(3)
]
print(f"\n📋 准备设置 {len(notes)} 篇定时笔记")
for i, note_data in enumerate(notes, 1):
print(f"\n正在设置第 {i} 篇...")
print(f" 标题: {note_data['title']}")
print(f" 发布时间: {note_data['publish_date'].strftime('%Y-%m-%d %H:%M')}")
await publish_single_note(note_data, account_file)
if i < len(notes):
await asyncio.sleep(30)
print("\n✅ 所有定时笔记已设置完成!")
if __name__ == "__main__":
print("请选择运行模式:")
print("1. 批量立即发布")
print("2. 批量定时发布")
choice = input("\n请输入选项 (1/2): ").strip()
if choice == "1":
asyncio.run(main())
elif choice == "2":
asyncio.run(batch_with_schedule())
else:
print("❌ 无效选项")