""" 批量发布示例 这个示例展示如何批量发布多篇笔记 """ import asyncio from datetime import datetime, timedelta from publisher import XiaoHongShuImageNote async def publish_single_note(note_data, account_file): """发布单篇笔记""" try: note = XiaoHongShuImageNote( title=note_data["title"], content=note_data["content"], tags=note_data["tags"], image_paths=note_data["images"], publish_date=note_data.get("publish_date", 0), account_file=account_file, location=note_data.get("location"), headless=True # 批量发布建议使用无头模式 ) await note.main() print(f"✅ [{note_data['title']}] 发布成功") return True except Exception as e: print(f"❌ [{note_data['title']}] 发布失败: {e}") return False async def main(): """批量发布主函数""" print("=" * 70) print("📦 小红书笔记批量发布示例") print("=" * 70) # Cookie文件路径 account_file = "cookies/account.json" # 准备要发布的笔记列表 notes = [ { "title": "美食分享第一弹 🍔", "content": "今天发现的宝藏餐厅,必须分享给大家!", "tags": ["#美食", "#探店", "#广州"], "images": ["../videos/image.jpg"], # 替换为实际图片 "location": "天河区" }, { "title": "周末出游记录 ✨", "content": "周末去了超美的公园,拍了好多照片!", "tags": ["#旅行", "#周末", "#打卡"], "images": ["../videos/image.jpg"], # 替换为实际图片 }, { "title": "日常穿搭分享 👗", "content": "最近很喜欢的一套搭配~", "tags": ["#穿搭", "#OOTD", "#日常"], "images": ["../videos/image.jpg"], # 替换为实际图片 }, ] print(f"\n📋 准备发布 {len(notes)} 篇笔记") print("💡 每篇笔记之间会自动间隔30秒,避免触发风控\n") # 统计结果 success_count = 0 failed_count = 0 # 逐个发布 for i, note_data in enumerate(notes, 1): print(f"\n{'='*70}") print(f"正在发布第 {i}/{len(notes)} 篇: {note_data['title']}") print(f"{'='*70}") result = await publish_single_note(note_data, account_file) if result: success_count += 1 else: failed_count += 1 # 如果不是最后一篇,等待一段时间 if i < len(notes): wait_time = 30 # 间隔30秒 print(f"\n⏰ 等待 {wait_time} 秒后发布下一篇...") await asyncio.sleep(wait_time) # 输出汇总 print("\n" + "=" * 70) print("📊 批量发布完成!") print("=" * 70) print(f"✅ 成功: {success_count} 篇") print(f"❌ 失败: {failed_count} 篇") print(f"📱 总计: {len(notes)} 篇") if success_count > 0: print("\n🎉 可以打开小红书App查看你的笔记了!") async def batch_with_schedule(): """批量定时发布示例""" print("=" * 70) print("⏰ 批量定时发布示例") print("=" * 70) account_file = "cookies/account.json" # 设置发布时间:今天下午3点、4点、5点 today = datetime.now() publish_times = [ today.replace(hour=15, minute=0, second=0), today.replace(hour=16, minute=0, second=0), today.replace(hour=17, minute=0, second=0), ] notes = [ { "title": f"定时笔记 {i+1}", "content": f"这是第{i+1}篇定时发布的笔记", "tags": ["#定时发布"], "images": ["../videos/image.jpg"], "publish_date": publish_times[i] } for i in range(3) ] print(f"\n📋 准备设置 {len(notes)} 篇定时笔记") for i, note_data in enumerate(notes, 1): print(f"\n正在设置第 {i} 篇...") print(f" 标题: {note_data['title']}") print(f" 发布时间: {note_data['publish_date'].strftime('%Y-%m-%d %H:%M')}") await publish_single_note(note_data, account_file) if i < len(notes): await asyncio.sleep(30) print("\n✅ 所有定时笔记已设置完成!") if __name__ == "__main__": print("请选择运行模式:") print("1. 批量立即发布") print("2. 批量定时发布") choice = input("\n请输入选项 (1/2): ").strip() if choice == "1": asyncio.run(main()) elif choice == "2": asyncio.run(batch_with_schedule()) else: print("❌ 无效选项")