2025-11-12 00:28:07 +08:00

571 lines
20 KiB
Python

"""
抖音适配器实现
"""
import asyncio
from typing import Optional, List, Dict, Any
from pathlib import Path
from datetime import datetime
from playwright.async_api import Page
from ..base_adapter import BaseAdapter
from ...core.models import (
PlatformType,
AccountInfo,
Content,
VideoContent,
PublishResult,
UploadStatus,
PublishStatus
)
from ...auth.douyin_auth import DouyinAuth
from ...config.platform_config import get_platform_url, get_selectors, get_wait_times, get_platform_config
from ...utils.browser import browser_manager
from ...utils.human_behavior import HumanBehaviorSimulator
from ...utils.logger import get_platform_logger, get_task_logger
from ...utils.exceptions import (
UploadFailedError,
ContentRejectedError,
ElementNotFoundError,
TimeoutError,
ValidationError
)
logger = get_platform_logger("douyin")
class DouyinAdapter(BaseAdapter):
"""抖音适配器"""
def __init__(self):
super().__init__(PlatformType.DOUYIN)
self.auth = DouyinAuth()
self.human_behavior = HumanBehaviorSimulator()
self.config = get_platform_config(PlatformType.DOUYIN)
def get_authenticator(self):
"""获取认证器实例"""
return self.auth
async def login(self, account_info: AccountInfo, headless: bool = False) -> bool:
"""登录抖音"""
return await self.auth.login(account_info, headless)
async def check_login_status(self, page: Page) -> bool:
"""检查抖音登录状态"""
return await self.auth.check_login_status(page)
async def publish_content(
self,
page: Page,
content: Content,
account_info: AccountInfo
) -> PublishResult:
"""
发布内容到抖音
Args:
page: Playwright页面对象
content: 要发布的内容
account_info: 账号信息
Returns:
发布结果
"""
task_logger = get_task_logger(
f"douyin_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
self.platform_name,
account_info.username
)
start_time = asyncio.get_event_loop().time()
task_logger.start(f"开始发布内容: {content.title}")
try:
# 验证内容
is_valid, error_msg = await self.validate_content(content)
if not is_valid:
raise ValidationError(error_msg)
# 抖音目前只支持视频内容
if not isinstance(content, VideoContent):
raise ValidationError(f"抖音只支持视频内容,不支持: {type(content)}")
result = await self._publish_video(page, content, account_info, task_logger)
# 计算耗时
duration = asyncio.get_event_loop().time() - start_time
result.duration = duration
if result.success:
task_logger.success(f"视频发布成功,耗时: {duration:.2f}")
else:
task_logger.failure(f"视频发布失败: {result.message}")
return result
except Exception as e:
duration = asyncio.get_event_loop().time() - start_time
error_msg = f"发布过程异常: {str(e)}"
task_logger.failure(error_msg)
return self.create_publish_result(
success=False,
message=error_msg,
task_id=task_logger.task_id,
account=account_info.username,
error_details={"exception": str(e), "type": type(e).__name__},
duration=duration
)
async def validate_content(self, content: Content) -> tuple[bool, str]:
"""验证抖音内容"""
# 基础验证
is_valid, error_msg = await super().validate_content(content)
if not is_valid:
return False, error_msg
# 抖音特定验证 - 只支持视频内容
if not isinstance(content, VideoContent):
return False, "抖音只支持视频内容"
# 检查视频文件
path = Path(content.video_path)
if not path.exists():
return False, f"视频文件不存在: {content.video_path}"
file_size = path.stat().st_size
if file_size > self.config.max_file_size:
return False, f"视频文件过大: {content.video_path} (最大支持 {self.config.max_file_size // (1024*1024*1024)}GB)"
# 检查视频格式
file_extension = path.suffix.lower().lstrip('.')
if file_extension not in self.config.supported_formats:
return False, f"不支持的视频格式: {file_extension},支持的格式: {', '.join(self.config.supported_formats)}"
# 检查视频时长(如果可用)
if content.duration and content.duration > self.config.max_duration:
return False, f"视频时长过长: {content.duration}秒 (最大支持 {self.config.max_duration}秒)"
return True, ""
async def _publish_video(
self,
page: Page,
content: VideoContent,
account_info: AccountInfo,
task_logger
) -> PublishResult:
"""发布视频到抖音"""
try:
task_logger.progress("开始发布视频")
# 导航到视频上传页面
upload_url = self.config.extra_config.get("upload_url")
if upload_url:
if not await self.auth.safe_goto(page, upload_url):
raise ElementNotFoundError("无法访问视频上传页面")
await asyncio.sleep(3)
# 上传视频
task_logger.progress("开始上传视频")
upload_success = await self._upload_video(page, content.video_path, task_logger)
if not upload_success:
raise UploadFailedError("视频上传失败")
# 填写视频标题
task_logger.progress("填写视频标题")
selectors = get_selectors(PlatformType.DOUYIN, "publish", "video")
title_selector = selectors.get("title_input")
if title_selector:
await self.human_behavior.human_type(page, title_selector, content.title)
await asyncio.sleep(1)
# 填写视频描述
if content.description:
task_logger.progress("填写视频描述")
desc_selector = selectors.get("description_input")
if desc_selector:
await self.human_behavior.human_type(page, desc_selector, content.description)
await asyncio.sleep(1)
# 添加标签
if content.tags:
task_logger.progress("添加标签")
await self._add_tags(page, content.tags, selectors.get("tag_input"))
# 发布视频
task_logger.progress("发布视频")
publish_success = await self._publish_video_final(page, task_logger)
if publish_success:
return self.create_publish_result(
success=True,
message="视频发布成功",
task_id=task_logger.task_id,
account=account_info.username
)
else:
return self.create_publish_result(
success=False,
message="视频发布失败",
task_id=task_logger.task_id,
account=account_info.username
)
except Exception as e:
task_logger.failure(f"视频发布异常: {e}")
return self.create_publish_result(
success=False,
message=f"视频发布失败: {str(e)}",
task_id=task_logger.task_id,
account=account_info.username,
error_details={"exception": str(e)}
)
async def _upload_video(
self,
page: Page,
video_path: str,
task_logger
) -> bool:
"""上传视频"""
try:
selectors = get_selectors(PlatformType.DOUYIN, "publish", "video")
upload_selector = selectors.get("video_upload")
if not upload_selector:
# 尝试其他可能的上传选择器
alternative_selectors = [
"input[type='file']",
"input[accept*='video']",
"[data-testid='video-upload']",
".upload-btn input",
"[class*='upload'] input",
".drag-upload-area input",
"[data-testid='upload-input']"
]
for selector in alternative_selectors:
try:
element = await page.wait_for_selector(selector, timeout=5000)
if element:
upload_selector = selector
break
except:
continue
if not upload_selector:
raise ElementNotFoundError("未找到视频上传元素")
# 找到file input元素
upload_input = await page.wait_for_selector(upload_selector, timeout=10000)
if not upload_input:
raise ElementNotFoundError("视频上传元素不可用")
# 上传视频文件
await upload_input.set_input_files(video_path)
task_logger.progress("已选择视频文件")
# 等待上传完成
wait_times = get_wait_times(PlatformType.DOUYIN, "publish")
upload_timeout = wait_times.get("video_upload", 180)
task_logger.progress("等待视频上传完成")
upload_complete = await self._wait_for_video_upload(page, upload_timeout)
if upload_complete:
task_logger.progress("视频上传完成")
return True
else:
task_logger.warning("视频上传超时")
return False
except Exception as e:
task_logger.error(f"视频上传异常: {e}")
return False
async def _wait_for_video_upload(self, page: Page, timeout: int = 180) -> bool:
"""等待视频上传完成"""
start_time = asyncio.get_event_loop().time()
# 可能的上传完成指示器
success_indicators = [
".upload-success",
"[data-testid='upload-success']",
".video-uploaded",
"[class*='success']",
".upload-complete",
".progress-100",
".upload-finish"
]
while asyncio.get_event_loop().time() - start_time < timeout:
try:
# 检查成功指示器
for indicator in success_indicators:
try:
element = await page.wait_for_selector(indicator, timeout=3000)
if element and await element.is_visible():
return True
except:
continue
# 检查进度条是否达到100%
progress_selectors = [
".upload-progress",
"[data-testid='upload-progress']",
".progress-bar",
".upload-progress-bar",
"[class*='progress']"
]
for selector in progress_selectors:
try:
element = await page.wait_for_selector(selector, timeout=2000)
if element:
# 检查进度是否完成
text = await element.inner_text()
if "100%" in text or "完成" in text or "上传完成" in text:
return True
# 检查进度条属性
style = await element.get_attribute("style")
if style and "width: 100%" in style:
return True
except:
continue
# 检查是否有错误提示
error_indicators = [
".upload-error",
"[data-testid='upload-error']",
".error-message",
"[class*='error']",
".upload-failed"
]
for indicator in error_indicators:
try:
element = await page.wait_for_selector(indicator, timeout=2000)
if element and await element.is_visible():
logger.warning(f"检测到上传错误: {indicator}")
return False
except:
continue
await asyncio.sleep(3)
except Exception:
await asyncio.sleep(3)
return False
async def _add_tags(self, page: Page, tags: List[str], tag_selector: Optional[str]):
"""添加标签"""
if not tags or not tag_selector:
return
try:
for tag in tags:
# 点击标签输入框
await self.human_behavior.human_click(page, tag_selector)
await asyncio.sleep(0.5)
# 输入标签(抖音使用#开头)
tag_text = tag if tag.startswith('#') else f"#{tag}"
await self.human_behavior.human_type(page, tag_selector, tag_text)
await asyncio.sleep(0.5)
# 按回车或空格确认标签
await page.keyboard.press("Enter")
await asyncio.sleep(0.5)
except Exception as e:
logger.warning(f"添加标签失败: {e}")
async def _publish_video_final(self, page: Page, task_logger) -> bool:
"""发布视频"""
try:
selectors = get_selectors(PlatformType.DOUYIN, "publish", "video")
publish_button_selector = selectors.get("publish_button")
if not publish_button_selector:
# 尝试其他可能的发布按钮选择器
alternative_selectors = [
"button[type='submit']",
"[data-testid='publish-btn']",
".publish-btn",
"button:has-text('发布')",
"button:has-text('提交')",
"button:has-text('立即发布')",
"[class*='publish'] button",
".submit-btn"
]
for selector in alternative_selectors:
try:
element = await page.wait_for_selector(selector, timeout=5000)
if element and await element.is_visible():
publish_button_selector = selector
break
except:
continue
if not publish_button_selector:
raise ElementNotFoundError("未找到发布按钮")
# 滚动到发布按钮
await page.scroll_into_view_if_needed(publish_button_selector)
await asyncio.sleep(1)
# 点击发布按钮
await self.human_behavior.human_click(page, publish_button_selector)
task_logger.progress("已点击发布按钮")
# 等待发布完成
wait_times = get_wait_times(PlatformType.DOUYIN, "publish")
publish_timeout = wait_times.get("publish_success", 15)
task_logger.progress("等待发布完成")
return await self._wait_for_publish_success(page, publish_timeout)
except Exception as e:
task_logger.error(f"发布视频失败: {e}")
return False
async def _wait_for_publish_success(self, page: Page, timeout: int = 15) -> bool:
"""等待发布成功"""
start_time = asyncio.get_event_loop().time()
# 可能的成功指示器
success_indicators = [
".publish-success",
"[data-testid='publish-success']",
".success-message",
"[class*='success']",
".upload-success",
"发布成功",
"提交成功",
"上传成功"
]
# 可能的页面跳转指示器
url_indicators = [
"manage",
"content",
"success"
]
while asyncio.get_event_loop().time() - start_time < timeout:
try:
# 检查成功指示器
for indicator in success_indicators:
if indicator.startswith('.') or indicator.startswith('[') or indicator.startswith('text='):
# CSS选择器或文本
try:
if indicator.startswith('text='):
if await page.locator(indicator).count() > 0:
return True
else:
element = await page.wait_for_selector(indicator, timeout=2000)
if element and await element.is_visible():
return True
except:
continue
# 检查URL变化
current_url = page.url.lower()
for indicator in url_indicators:
if indicator in current_url:
return True
await asyncio.sleep(1)
except Exception:
await asyncio.sleep(1)
return False
async def get_upload_progress(self, page: Page) -> float:
"""
获取上传进度
Args:
page: Playwright页面对象
Returns:
上传进度 (0-100)
"""
try:
progress_selectors = [
".upload-progress",
"[data-testid='upload-progress']",
".progress-bar",
".upload-progress-bar"
]
for selector in progress_selectors:
try:
element = await page.wait_for_selector(selector, timeout=2000)
if element:
# 尝试获取文本内容
text = await element.inner_text()
if '%' in text:
# 提取百分比数字
import re
match = re.search(r'(\d+)%', text)
if match:
return float(match.group(1))
# 尝试获取样式属性
style = await element.get_attribute("style")
if style and "width:" in style:
import re
match = re.search(r'width:\s*(\d+(?:\.\d+)?)%', style)
if match:
return float(match.group(1))
except:
continue
except Exception as e:
logger.debug(f"获取上传进度失败: {e}")
return 0.0
async def cancel_upload(self, page: Page) -> bool:
"""
取消上传
Args:
page: Playwright页面对象
Returns:
取消是否成功
"""
try:
cancel_selectors = [
".cancel-upload",
"[data-testid='cancel-upload']",
".upload-cancel",
"button:has-text('取消')",
"button:has-text('停止')"
]
for selector in cancel_selectors:
try:
element = await page.wait_for_selector(selector, timeout=3000)
if element and await element.is_visible():
await element.click()
logger.info("上传已取消")
return True
except:
continue
return False
except Exception as e:
logger.error(f"取消上传失败: {e}")
return False