473 lines
14 KiB
Python
473 lines
14 KiB
Python
"""
|
|
核心发布管理器
|
|
统一的社交媒体发布管理器。
|
|
"""
|
|
import asyncio
|
|
from typing import Dict, List, Optional, Type, Union
|
|
from datetime import datetime
|
|
|
|
from .models import (
|
|
PlatformType,
|
|
AccountInfo,
|
|
Content,
|
|
PublishTask,
|
|
PublishResult,
|
|
UploadStatus,
|
|
PublishStatus
|
|
)
|
|
from ..platforms.base_adapter import BaseAdapter
|
|
from ..platforms.xiaohongshu.xiaohongshu_adapter import XiaoHongShuAdapter
|
|
from ..platforms.douyin.douyin_adapter import DouyinAdapter
|
|
from ..session_manager import SessionManager
|
|
from ..utils.browser import browser_manager
|
|
from ..utils.logger import get_logger, get_task_logger
|
|
from ..utils.exceptions import (
|
|
SocialMediaError,
|
|
PlatformNotSupportedError,
|
|
ValidationError,
|
|
AuthenticationError
|
|
)
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
class Publisher:
|
|
"""统一的发布管理器"""
|
|
|
|
def __init__(self, headless: bool = False, debug: bool = False):
|
|
"""
|
|
初始化发布管理器
|
|
|
|
Args:
|
|
headless: 是否使用无头模式
|
|
debug: 是否开启调试模式
|
|
"""
|
|
self.headless = headless
|
|
self.debug = debug
|
|
self.session_manager = SessionManager()
|
|
self.adapters: Dict[PlatformType, BaseAdapter] = {}
|
|
self._lock = asyncio.Lock()
|
|
|
|
# 初始化适配器
|
|
self._initialize_adapters()
|
|
|
|
def _initialize_adapters(self):
|
|
"""初始化平台适配器"""
|
|
self.adapters = {
|
|
PlatformType.XIAOHONGSHU: XiaoHongShuAdapter(),
|
|
PlatformType.DOUYIN: DouyinAdapter()
|
|
}
|
|
|
|
logger.info(f"已初始化 {len(self.adapters)} 个平台适配器")
|
|
|
|
async def setup_platform(self, platform: Union[str, PlatformType], account_name: str) -> bool:
|
|
"""
|
|
设置平台账号
|
|
|
|
Args:
|
|
platform: 平台名称或类型
|
|
account_name: 账号名称
|
|
|
|
Returns:
|
|
设置是否成功
|
|
"""
|
|
try:
|
|
if isinstance(platform, str):
|
|
platform = PlatformType(platform.lower())
|
|
|
|
if platform not in self.adapters:
|
|
raise PlatformNotSupportedError(f"不支持的平台: {platform}")
|
|
|
|
adapter = self.adapters[platform]
|
|
account_info = AccountInfo(
|
|
platform=platform,
|
|
username=account_name,
|
|
cookie_file=f"{account_name}.json"
|
|
)
|
|
|
|
logger.info(f"设置平台账号: {platform.value}/{account_name}")
|
|
|
|
# 执行登录
|
|
success = await adapter.login(account_info, headless=self.headless)
|
|
|
|
if success:
|
|
# 保存会话
|
|
await self.session_manager.save_session(platform, account_name)
|
|
logger.success(f"平台设置成功: {platform.value}/{account_name}")
|
|
else:
|
|
logger.error(f"平台设置失败: {platform.value}/{account_name}")
|
|
|
|
return success
|
|
|
|
except Exception as e:
|
|
logger.error(f"平台设置异常: {e}")
|
|
return False
|
|
|
|
async def publish(
|
|
self,
|
|
platform: Union[str, PlatformType],
|
|
content: Content,
|
|
account_name: str
|
|
) -> PublishResult:
|
|
"""
|
|
发布内容到指定平台
|
|
|
|
Args:
|
|
platform: 平台名称或类型
|
|
content: 要发布的内容
|
|
account_name: 账号名称
|
|
|
|
Returns:
|
|
发布结果
|
|
"""
|
|
task_id = f"{platform}_{account_name}_{int(datetime.now().timestamp())}"
|
|
task_logger = get_task_logger(task_id, str(platform), account_name)
|
|
|
|
start_time = asyncio.get_event_loop().time()
|
|
|
|
try:
|
|
# 转换平台类型
|
|
if isinstance(platform, str):
|
|
platform = PlatformType(platform.lower())
|
|
|
|
# 验证平台支持
|
|
if platform not in self.adapters:
|
|
raise PlatformNotSupportedError(f"不支持的平台: {platform}")
|
|
|
|
adapter = self.adapters[platform]
|
|
account_info = AccountInfo(
|
|
platform=platform,
|
|
username=account_name,
|
|
cookie_file=f"{account_name}.json"
|
|
)
|
|
|
|
task_logger.start(f"开始发布任务: {content.title}")
|
|
|
|
# 获取或创建浏览器会话
|
|
page = await self.session_manager.get_session(platform, account_name, headless=self.headless)
|
|
|
|
# 验证登录状态
|
|
if not await adapter.check_login_status(page):
|
|
task_logger.warning("登录状态失效,尝试重新登录")
|
|
login_success = await adapter.login(account_info, headless=self.headless)
|
|
|
|
if not login_success:
|
|
raise AuthenticationError(f"登录失败: {platform.value}/{account_name}")
|
|
|
|
# 重新获取页面
|
|
page = await self.session_manager.get_session(platform, account_name, headless=self.headless)
|
|
|
|
# 发布内容
|
|
result = await adapter.publish_content(page, content, account_info)
|
|
|
|
# 更新任务信息
|
|
result.task_id = task_id
|
|
|
|
# 计算耗时
|
|
duration = asyncio.get_event_loop().time() - start_time
|
|
result.duration = duration
|
|
|
|
if result.success:
|
|
task_logger.success(f"任务完成,耗时: {duration:.2f}秒")
|
|
else:
|
|
task_logger.failure(f"任务失败: {result.message}")
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
duration = asyncio.get_event_loop().time() - start_time
|
|
error_msg = f"发布任务异常: {str(e)}"
|
|
task_logger.failure(error_msg)
|
|
|
|
return PublishResult(
|
|
task_id=task_id,
|
|
platform=platform if isinstance(platform, PlatformType) else PlatformType(platform),
|
|
account=account_name,
|
|
success=False,
|
|
message=error_msg,
|
|
error_details={"exception": str(e), "type": type(e).__name__},
|
|
duration=duration
|
|
)
|
|
|
|
async def batch_publish(self, tasks: List[PublishTask]) -> List[PublishResult]:
|
|
"""
|
|
批量发布内容
|
|
|
|
Args:
|
|
tasks: 发布任务列表
|
|
|
|
Returns:
|
|
发布结果列表
|
|
"""
|
|
if not tasks:
|
|
return []
|
|
|
|
logger.info(f"开始批量发布任务: {len(tasks)} 个任务")
|
|
|
|
results = []
|
|
|
|
# 使用信号量控制并发数
|
|
semaphore = asyncio.Semaphore(3) # 最多同时执行3个任务
|
|
|
|
async def _publish_task(task: PublishTask) -> PublishResult:
|
|
async with semaphore:
|
|
try:
|
|
# 转换为发布方法调用
|
|
result = await self.publish(
|
|
platform=task.platform,
|
|
content=task.content,
|
|
account_name=task.account.username
|
|
)
|
|
|
|
# 如果失败且可以重试
|
|
if not result.success and task.should_retry:
|
|
logger.warning(f"任务失败,准备重试: {task.id} (第{task.retry_count + 1}次)")
|
|
task.increment_retry()
|
|
await asyncio.sleep(2 ** task.retry_count) # 指数退避
|
|
return await _publish_task(task)
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"批量发布任务异常: {task.id} - {e}")
|
|
return PublishResult(
|
|
task_id=task.id,
|
|
platform=task.platform,
|
|
account=task.account.username,
|
|
success=False,
|
|
message=f"任务执行异常: {str(e)}",
|
|
error_details={"exception": str(e)}
|
|
)
|
|
|
|
# 并发执行所有任务
|
|
results = await asyncio.gather(
|
|
*[_publish_task(task) for task in tasks],
|
|
return_exceptions=True
|
|
)
|
|
|
|
# 处理异常结果
|
|
processed_results = []
|
|
for i, result in enumerate(results):
|
|
if isinstance(result, Exception):
|
|
processed_results.append(PublishResult(
|
|
task_id=tasks[i].id,
|
|
platform=tasks[i].platform,
|
|
account=tasks[i].account.username,
|
|
success=False,
|
|
message=f"任务执行异常: {str(result)}",
|
|
error_details={"exception": str(result)}
|
|
))
|
|
else:
|
|
processed_results.append(result)
|
|
|
|
# 统计结果
|
|
success_count = sum(1 for r in processed_results if r.success)
|
|
logger.info(f"批量发布完成: 成功 {success_count}/{len(processed_results)}")
|
|
|
|
return processed_results
|
|
|
|
async def test_login(self, platform: Union[str, PlatformType], account_name: str) -> bool:
|
|
"""
|
|
测试登录状态
|
|
|
|
Args:
|
|
platform: 平台名称或类型
|
|
account_name: 账号名称
|
|
|
|
Returns:
|
|
登录是否有效
|
|
"""
|
|
try:
|
|
if isinstance(platform, str):
|
|
platform = PlatformType(platform.lower())
|
|
|
|
if platform not in self.adapters:
|
|
raise PlatformNotSupportedError(f"不支持的平台: {platform}")
|
|
|
|
adapter = self.adapters[platform]
|
|
account_info = AccountInfo(
|
|
platform=platform,
|
|
username=account_name,
|
|
cookie_file=f"{account_name}.json"
|
|
)
|
|
|
|
return await adapter.get_authenticator().test_login(account_info)
|
|
|
|
except Exception as e:
|
|
logger.error(f"登录状态测试失败: {e}")
|
|
return False
|
|
|
|
async def get_supported_platforms(self) -> List[PlatformType]:
|
|
"""
|
|
获取支持的平台列表
|
|
|
|
Returns:
|
|
支持的平台类型列表
|
|
"""
|
|
return list(self.adapters.keys())
|
|
|
|
async def validate_content(
|
|
self,
|
|
platform: Union[str, PlatformType],
|
|
content: Content
|
|
) -> tuple[bool, str]:
|
|
"""
|
|
验证内容是否符合平台要求
|
|
|
|
Args:
|
|
platform: 平台名称或类型
|
|
content: 要验证的内容
|
|
|
|
Returns:
|
|
(是否有效, 错误信息)
|
|
"""
|
|
try:
|
|
if isinstance(platform, str):
|
|
platform = PlatformType(platform.lower())
|
|
|
|
if platform not in self.adapters:
|
|
raise PlatformNotSupportedError(f"不支持的平台: {platform}")
|
|
|
|
adapter = self.adapters[platform]
|
|
return await adapter.validate_content(content)
|
|
|
|
except Exception as e:
|
|
return False, f"内容验证异常: {str(e)}"
|
|
|
|
async def add_custom_adapter(self, platform: PlatformType, adapter: BaseAdapter):
|
|
"""
|
|
添加自定义适配器
|
|
|
|
Args:
|
|
platform: 平台类型
|
|
adapter: 适配器实例
|
|
"""
|
|
self.adapters[platform] = adapter
|
|
logger.info(f"已添加自定义适配器: {platform.value}")
|
|
|
|
async def remove_adapter(self, platform: PlatformType):
|
|
"""
|
|
移除平台适配器
|
|
|
|
Args:
|
|
platform: 平台类型
|
|
"""
|
|
if platform in self.adapters:
|
|
del self.adapters[platform]
|
|
logger.info(f"已移除适配器: {platform.value}")
|
|
|
|
async def get_adapter(self, platform: PlatformType) -> Optional[BaseAdapter]:
|
|
"""
|
|
获取平台适配器
|
|
|
|
Args:
|
|
platform: 平台类型
|
|
|
|
Returns:
|
|
适配器实例或None
|
|
"""
|
|
return self.adapters.get(platform)
|
|
|
|
async def cleanup(self):
|
|
"""清理资源"""
|
|
try:
|
|
async with self._lock:
|
|
await self.session_manager.cleanup()
|
|
await browser_manager.cleanup()
|
|
logger.info("发布管理器资源清理完成")
|
|
|
|
except Exception as e:
|
|
logger.error(f"资源清理失败: {e}")
|
|
|
|
async def __aenter__(self):
|
|
"""异步上下文管理器入口"""
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
"""异步上下文管理器出口"""
|
|
await self.cleanup()
|
|
|
|
|
|
# 便捷函数
|
|
async def create_publisher(headless: bool = False, debug: bool = False) -> Publisher:
|
|
"""
|
|
创建发布器实例
|
|
|
|
Args:
|
|
headless: 是否使用无头模式
|
|
debug: 是否开启调试模式
|
|
|
|
Returns:
|
|
发布器实例
|
|
"""
|
|
return Publisher(headless=headless, debug=debug)
|
|
|
|
|
|
async def publish_to_xiaohongshu(
|
|
title: str,
|
|
description: str,
|
|
images: List[str],
|
|
tags: Optional[List[str]] = None,
|
|
account_name: str = "default",
|
|
headless: bool = False
|
|
) -> PublishResult:
|
|
"""
|
|
快速发布小红书图文笔记
|
|
|
|
Args:
|
|
title: 标题
|
|
description: 描述
|
|
images: 图片路径列表
|
|
tags: 标签列表
|
|
account_name: 账号名称
|
|
headless: 是否使用无头模式
|
|
|
|
Returns:
|
|
发布结果
|
|
"""
|
|
from .models import ImageNote
|
|
|
|
content = ImageNote(
|
|
title=title,
|
|
description=description,
|
|
images=images,
|
|
tags=tags or []
|
|
)
|
|
|
|
async with Publisher(headless=headless) as publisher:
|
|
return await publisher.publish(PlatformType.XIAOHONGSHU, content, account_name)
|
|
|
|
|
|
async def publish_to_douyin(
|
|
title: str,
|
|
description: str,
|
|
video_path: str,
|
|
tags: Optional[List[str]] = None,
|
|
account_name: str = "default",
|
|
headless: bool = False
|
|
) -> PublishResult:
|
|
"""
|
|
快速发布抖音视频
|
|
|
|
Args:
|
|
title: 标题
|
|
description: 描述
|
|
video_path: 视频文件路径
|
|
tags: 标签列表
|
|
account_name: 账号名称
|
|
headless: 是否使用无头模式
|
|
|
|
Returns:
|
|
发布结果
|
|
"""
|
|
from .models import VideoContent
|
|
|
|
content = VideoContent(
|
|
title=title,
|
|
description=description,
|
|
video_path=video_path,
|
|
tags=tags or []
|
|
)
|
|
|
|
async with Publisher(headless=headless) as publisher:
|
|
return await publisher.publish(PlatformType.DOUYIN, content, account_name) |