""" 基础使用示例 演示如何使用社交媒体自动发布器的基本功能。 """ import asyncio import sys from pathlib import Path # 添加项目根目录到Python路径 project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) from core.publisher import Publisher, create_publisher from core.models import ImageNote, VideoContent, PlatformType from utils.logger import setup_logger # 设置日志 setup_logger(level="INFO") async def basic_xiaohongshu_example(): """小红书基础使用示例""" print("=" * 50) print("小红书图文笔记发布示例") print("=" * 50) # 创建发布器 async with create_publisher(headless=False) as publisher: # 1. 设置小红书账号(首次使用需要扫码登录) print("1. 设置小红书账号...") success = await publisher.setup_platform("xiaohongshu", "your_account_name") if not success: print("❌ 小红书账号设置失败") return print("✅ 小红书账号设置成功") # 2. 创建图文笔记内容 print("\n2. 创建图文笔记内容...") note = ImageNote( title="我的第一篇自动化笔记", description="这是使用社交媒体自动发布器发布的第一篇笔记,内容很精彩!", images=[ "media/images/example1.jpg", # 替换为实际图片路径 "media/images/example2.jpg", # 替换为实际图片路径 ], tags=["自动化", "Python", "测试"] ) # 3. 发布内容 print("\n3. 发布内容...") result = await publisher.publish("xiaohongshu", note, "your_account_name") if result.success: print(f"✅ 发布成功!") print(f" 任务ID: {result.task_id}") print(f" 耗时: {result.duration:.2f}秒") if result.published_url: print(f" 链接: {result.published_url}") else: print(f"❌ 发布失败: {result.message}") if result.error_details: print(f" 错误详情: {result.error_details}") async def basic_douyin_example(): """抖音基础使用示例""" print("=" * 50) print("抖音视频发布示例") print("=" * 50) # 创建发布器 async with create_publisher(headless=False) as publisher: # 1. 设置抖音账号(首次使用需要扫码登录) print("1. 设置抖音账号...") success = await publisher.setup_platform("douyin", "your_account_name") if not success: print("❌ 抖音账号设置失败") return print("✅ 抖音账号设置成功") # 2. 创建视频内容 print("\n2. 创建视频内容...") video = VideoContent( title="我的第一个自动化视频", description="这是使用社交媒体自动发布器发布的第一个视频,希望大家喜欢!", video_path="media/videos/example.mp4", # 替换为实际视频路径 tags=["自动化", "视频", "分享"] ) # 3. 发布内容 print("\n3. 发布内容...") result = await publisher.publish("douyin", video, "your_account_name") if result.success: print(f"✅ 发布成功!") print(f" 任务ID: {result.task_id}") print(f" 耗时: {result.duration:.2f}秒") if result.published_url: print(f" 链接: {result.published_url}") else: print(f"❌ 发布失败: {result.message}") if result.error_details: print(f" 错误详情: {result.error_details}") async def quick_publish_examples(): """快速发布示例""" print("=" * 50) print("快速发布示例") print("=" * 50) # 快速发布小红书图文笔记 print("1. 快速发布小红书图文笔记...") try: from core.publisher import publish_to_xiaohongshu result = await publish_to_xiaohongshu( title="快速发布测试笔记", description="这是一个快速发布测试笔记", images=["media/images/quick_test.jpg"], # 替换为实际图片路径 tags=["快速", "测试"], account_name="your_account_name", headless=False ) if result.success: print(f"✅ 小红书快速发布成功: {result.task_id}") else: print(f"❌ 小红书快速发布失败: {result.message}") except Exception as e: print(f"❌ 小红书快速发布异常: {e}") # 快速发布抖音视频 print("\n2. 快速发布抖音视频...") try: from core.publisher import publish_to_douyin result = await publish_to_douyin( title="快速发布测试视频", description="这是一个快速发布测试视频", video_path="media/videos/quick_test.mp4", # 替换为实际视频路径 tags=["快速", "测试"], account_name="your_account_name", headless=False ) if result.success: print(f"✅ 抖音快速发布成功: {result.task_id}") else: print(f"❌ 抖音快速发布失败: {result.message}") except Exception as e: print(f"❌ 抖音快速发布异常: {e}") async def batch_publish_example(): """批量发布示例""" print("=" * 50) print("批量发布示例") print("=" * 50) from core.models import PublishTask # 创建批量任务 tasks = [ PublishTask( id="task_1", platform=PlatformType.XIAOHONGSHU, account=None, # 需要实际的AccountInfo对象 content=ImageNote( title="批量测试笔记1", description="这是批量发布的第1篇笔记", images=["media/images/batch1.jpg"], tags=["批量", "测试1"] ) ), PublishTask( id="task_2", platform=PlatformType.DOUYIN, account=None, # 需要实际的AccountInfo对象 content=VideoContent( title="批量测试视频1", description="这是批量发布的第1个视频", video_path="media/videos/batch1.mp4", tags=["批量", "视频1"] ) ) ] # 注意:实际使用时需要提供有效的AccountInfo对象 print("批量发布需要配置有效的账号信息") print("请参考其他示例先设置账号") async def platform_info_example(): """平台信息示例""" print("=" * 50) print("平台信息示例") print("=" * 50) async with create_publisher() as publisher: # 获取支持的平台 platforms = await publisher.get_supported_platforms() print(f"支持的平台: {[p.value for p in platforms]}") # 测试登录状态 for platform in platforms: is_logged_in = await publisher.test_login(platform, "your_account_name") print(f"{platform.value} 登录状态: {'✅ 已登录' if is_logged_in else '❌ 未登录'}") # 内容验证示例 note = ImageNote( title="测试标题", description="测试描述", images=["test.jpg"] ) for platform in platforms: is_valid, error_msg = await publisher.validate_content(platform, note) print(f"{platform.value} 内容验证: {'✅ 有效' if is_valid else f'❌ 无效: {error_msg}'}") async def main(): """主函数""" print("🚀 社交媒体自动发布器 - 基础使用示例") print("注意:请将示例中的文件路径和账号名称替换为实际值") try: # 运行小红书示例 # await basic_xiaohongshu_example() # 运行抖音示例 # await basic_douyin_example() # 运行快速发布示例 # await quick_publish_examples() # 运行平台信息示例 await platform_info_example() except KeyboardInterrupt: print("\n⏹️ 用户中断") except Exception as e: print(f"\n❌ 运行异常: {e}") import traceback traceback.print_exc() if __name__ == "__main__": asyncio.run(main())