339 lines
12 KiB
Python
339 lines
12 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
高级演示测试 - 展示项目核心功能
|
||
"""
|
||
|
||
import asyncio
|
||
import sys
|
||
from pathlib import Path
|
||
from datetime import datetime
|
||
from dataclasses import dataclass, field
|
||
from enum import Enum
|
||
from typing import List, Optional, Dict, Any
|
||
|
||
# 使用我们之前测试过的可用模型定义
|
||
class PlatformType(Enum):
|
||
XIAOHONGSHU = "xiaohongshu"
|
||
DOUYIN = "douyin"
|
||
|
||
@dataclass
|
||
class ImageNote:
|
||
title: str
|
||
description: str
|
||
images: List[str] = field(default_factory=list)
|
||
tags: List[str] = field(default_factory=list)
|
||
visibility: str = "public"
|
||
cover_image: Optional[str] = None
|
||
|
||
@dataclass
|
||
class VideoContent:
|
||
title: str
|
||
description: str
|
||
video_path: str
|
||
tags: List[str] = field(default_factory=list)
|
||
visibility: str = "public"
|
||
cover_image: Optional[str] = None
|
||
|
||
@dataclass
|
||
class AccountInfo:
|
||
platform: PlatformType
|
||
username: str
|
||
cookie_file: str
|
||
is_active: bool = True
|
||
|
||
class SimplePublisher:
|
||
"""简化的发布器演示类"""
|
||
|
||
def __init__(self):
|
||
self.browser = None
|
||
self.page = None
|
||
|
||
async def setup_browser(self, headless=False):
|
||
"""设置浏览器"""
|
||
try:
|
||
from playwright.async_api import async_playwright
|
||
|
||
self.playwright = await async_playwright().start()
|
||
self.browser = await self.playwright.chromium.launch(
|
||
headless=headless,
|
||
args=[
|
||
'--no-sandbox',
|
||
'--disable-blink-features=AutomationControlled'
|
||
]
|
||
)
|
||
|
||
# 设置用户代理
|
||
context = await self.browser.new_context(
|
||
user_agent='Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36'
|
||
)
|
||
|
||
self.page = await context.new_page()
|
||
|
||
# 注入反检测脚本
|
||
await self.page.add_init_script("""
|
||
Object.defineProperty(navigator, 'webdriver', {
|
||
get: () => undefined,
|
||
});
|
||
""")
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"❌ 浏览器设置失败: {e}")
|
||
return False
|
||
|
||
async def test_platform_access(self, platform, account_info):
|
||
"""测试平台访问(不登录)"""
|
||
print(f"\n🌐 测试{platform.value}平台访问")
|
||
print("-" * 40)
|
||
|
||
try:
|
||
if platform == PlatformType.XIAOHONGSHU:
|
||
# 访问小红书创作者中心
|
||
await self.page.goto("https://creator.xiaohongshu.com/", timeout=10000)
|
||
await asyncio.sleep(2)
|
||
|
||
# 检查页面是否正常加载
|
||
title = await self.page.title()
|
||
print(f"✅ 页面加载成功: {title}")
|
||
|
||
# 检查是否有登录相关元素
|
||
login_elements = await self.page.query_selector_all('[class*="login"], [class*="auth"], button:has-text("登录")')
|
||
if login_elements:
|
||
print(f"✅ 检测到 {len(login_elements)} 个登录相关元素")
|
||
|
||
return True
|
||
|
||
elif platform == PlatformType.DOUYIN:
|
||
# 访问抖音创作者中心
|
||
await self.page.goto("https://creator.douyin.com/", timeout=10000)
|
||
await asyncio.sleep(2)
|
||
|
||
# 检查页面是否正常加载
|
||
title = await self.page.title()
|
||
print(f"✅ 页面加载成功: {title}")
|
||
|
||
# 检查是否有登录相关元素
|
||
login_elements = await self.page.query_selector_all('[class*="login"], [class*="auth"], button:has-text("登录")')
|
||
if login_elements:
|
||
print(f"✅ 检测到 {len(login_elements)} 个登录相关元素")
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"❌ 平台访问失败: {e}")
|
||
return False
|
||
|
||
async def simulate_content_creation(self, content, platform):
|
||
"""模拟内容创建过程(不实际提交)"""
|
||
print(f"\n📝 模拟{platform.value}内容创建")
|
||
print("-" * 40)
|
||
|
||
try:
|
||
if platform == PlatformType.XIAOHONGSHU:
|
||
# 模拟访问图文笔记发布页面
|
||
await self.page.goto("https://creator.xiaohongshu.com/publish/publish", timeout=10000)
|
||
await asyncio.sleep(3)
|
||
|
||
# 检查是否到达发布页面
|
||
url = self.page.url
|
||
if "publish" in url:
|
||
print("✅ 成功访问图文笔记发布页面")
|
||
|
||
# 模拟查找标题输入框
|
||
title_input = await self.page.query_selector('input[placeholder*="标题"], textarea[placeholder*="标题"]')
|
||
if title_input:
|
||
print("✅ 找到标题输入框")
|
||
# 只模拟输入,不实际输入
|
||
print(f"📝 准备输入标题: {content.title}")
|
||
|
||
# 模拟查找内容输入框
|
||
content_input = await self.page.query_selector('textarea[placeholder*="内容"], textarea[placeholder*="描述"]')
|
||
if content_input:
|
||
print("✅ 找到内容输入框")
|
||
print(f"📝 准备输入描述: {content.description[:30]}...")
|
||
|
||
# 模拟查找图片上传区域
|
||
upload_area = await self.page.query_selector('[class*="upload"], [data-testid*="upload"], input[type="file"]')
|
||
if upload_area:
|
||
print("✅ 找到图片上传区域")
|
||
print(f"📷 准备上传 {len(content.images)} 张图片")
|
||
|
||
return True
|
||
else:
|
||
print("❌ 未能访问发布页面,可能需要登录")
|
||
return False
|
||
|
||
elif platform == PlatformType.DOUYIN:
|
||
# 模拟访问视频上传页面
|
||
await self.page.goto("https://creator.douyin.com/creator-micro/content/upload", timeout=10000)
|
||
await asyncio.sleep(3)
|
||
|
||
# 检查页面
|
||
url = self.page.url
|
||
if "upload" in url or "content" in url:
|
||
print("✅ 成功访问内容上传页面")
|
||
|
||
# 模拟查找标题输入框
|
||
title_input = await self.page.query_selector('input[placeholder*="标题"]')
|
||
if title_input:
|
||
print("✅ 找到标题输入框")
|
||
print(f"📝 准备输入标题: {content.title}")
|
||
|
||
# 模拟查找描述输入框
|
||
desc_input = await self.page.query_selector('textarea[placeholder*="描述"]')
|
||
if desc_input:
|
||
print("✅ 找到描述输入框")
|
||
print(f"📝 准备输入描述: {content.description[:30]}...")
|
||
|
||
# 模拟查找视频上传区域
|
||
video_upload = await self.page.query_selector('[class*="upload"], input[type="file"][accept*="video"]')
|
||
if video_upload:
|
||
print("✅ 找到视频上传区域")
|
||
print(f"🎬 准备上传视频: {content.video_path}")
|
||
|
||
return True
|
||
else:
|
||
print("❌ 未能访问上传页面,可能需要登录")
|
||
return False
|
||
|
||
except Exception as e:
|
||
print(f"❌ 内容创建模拟失败: {e}")
|
||
return False
|
||
|
||
async def cleanup(self):
|
||
"""清理资源"""
|
||
if self.page:
|
||
await self.page.close()
|
||
if self.browser:
|
||
await self.browser.close()
|
||
if hasattr(self, 'playwright'):
|
||
await self.playwright.stop()
|
||
|
||
def create_sample_content():
|
||
"""创建示例内容"""
|
||
contents = []
|
||
|
||
# 小红书图文笔记
|
||
xhs_note = ImageNote(
|
||
title="今日美食分享 🍜| 超好吃的家常面",
|
||
description="今天做了特别好吃的家常面,汤底浓郁,面条劲道,配上特制的酱料,简直是绝配!详细做法在图片里,大家一定要试试看~\n\n#家常美食 #面条 #美食分享 #简单易学",
|
||
images=["delicious_noodles1.jpg", "delicious_noodles2.jpg", "delicious_noodles3.jpg"],
|
||
tags=["家常美食", "面条", "美食分享", "简单易学", "今日菜单"]
|
||
)
|
||
contents.append((xhs_note, PlatformType.XIAOHONGSHU))
|
||
|
||
# 抖音视频
|
||
douyin_video = VideoContent(
|
||
title="10分钟搞定快手早餐 🥪",
|
||
description="分享一个超级简单的快手早餐做法,10分钟就能搞定,营养又美味!上班族和学生党都适合~\n\n#快手早餐 #早餐教程 #简单易学 #美食vlog",
|
||
video_path="quick_breakfast.mp4",
|
||
tags=["快手早餐", "早餐教程", "简单易学", "美食vlog", "生活技巧"]
|
||
)
|
||
contents.append((douyin_video, PlatformType.DOUYIN))
|
||
|
||
return contents
|
||
|
||
async def main():
|
||
"""主演示函数"""
|
||
print("🚀 社交媒体自动发布器 - 高级功能演示")
|
||
print("📋 这个演示会访问真实平台页面,但不会发布任何内容")
|
||
print("=" * 70)
|
||
|
||
# 创建示例内容和账号
|
||
contents = create_sample_content()
|
||
|
||
# 创建测试账号
|
||
test_accounts = {
|
||
PlatformType.XIAOHONGSHU: AccountInfo(
|
||
platform=PlatformType.XIAOHONGSHU,
|
||
username="demo_xhs_user",
|
||
cookie_file="demo_xhs_user.json"
|
||
),
|
||
PlatformType.DOUYIN: AccountInfo(
|
||
platform=PlatformType.DOUYIN,
|
||
username="demo_douyin_user",
|
||
cookie_file="demo_douyin_user.json"
|
||
)
|
||
}
|
||
|
||
# 创建发布器
|
||
publisher = SimplePublisher()
|
||
|
||
try:
|
||
# 设置浏览器(显示模式,便于观察)
|
||
print("\n🌐 1. 设置浏览器环境")
|
||
if await publisher.setup_browser(headless=False):
|
||
print("✅ 浏览器设置成功")
|
||
else:
|
||
print("❌ 浏览器设置失败")
|
||
return False
|
||
|
||
# 测试每个平台
|
||
for content, platform in contents:
|
||
print(f"\n{'='*70}")
|
||
print(f"📱 测试平台: {platform.value}")
|
||
print(f"📝 内容: {content.title}")
|
||
print(f"{'='*70}")
|
||
|
||
# 1. 测试平台访问
|
||
if await publisher.test_platform_access(platform, test_accounts[platform]):
|
||
|
||
# 2. 模拟内容创建
|
||
await publisher.simulate_content_creation(content, platform)
|
||
|
||
# 3. 显示内容预览
|
||
print(f"\n📋 内容预览:")
|
||
print(f" 标题: {content.title}")
|
||
print(f" 描述: {content.description[:100]}...")
|
||
print(f" 标签: {', '.join(content.tags)}")
|
||
|
||
if isinstance(content, ImageNote):
|
||
print(f" 图片: {len(content.images)} 张")
|
||
elif isinstance(content, VideoContent):
|
||
print(f" 视频: {content.video_path}")
|
||
|
||
print(f"\n⏳ 等待3秒后测试下一个平台...")
|
||
await asyncio.sleep(3)
|
||
|
||
else:
|
||
print(f"❌ {platform.value}平台访问失败")
|
||
|
||
print(f"\n{'='*70}")
|
||
print("🎉 演示测试完成!")
|
||
print("\n📝 演示总结:")
|
||
print("✅ 浏览器环境设置成功")
|
||
print("✅ 平台页面访问测试")
|
||
print("✅ 内容创建流程模拟")
|
||
print("✅ 发布界面元素检测")
|
||
|
||
print(f"\n📋 实际使用时:")
|
||
print("1. 准备真实的图片和视频文件")
|
||
print("2. 运行完整的登录流程(扫码登录)")
|
||
print("3. 使用真实的账号信息")
|
||
print("4. 执行实际的内容发布")
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"\n❌ 演示过程中出现异常: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return False
|
||
|
||
finally:
|
||
# 清理资源
|
||
await publisher.cleanup()
|
||
|
||
if __name__ == "__main__":
|
||
print("⚠️ 注意: 这个演示会打开浏览器窗口,访问真实网站页面")
|
||
print("⚠️ 但不会进行任何登录操作或内容发布")
|
||
print("⚠️ 请在30秒内观察浏览器行为,或按Ctrl+C退出")
|
||
print("\n按回车键继续演示,或按Ctrl+C退出...")
|
||
try:
|
||
input()
|
||
result = asyncio.run(main())
|
||
sys.exit(0 if result else 1)
|
||
except KeyboardInterrupt:
|
||
print("\n⏹️ 用户中断演示")
|
||
sys.exit(0) |