2025-11-12 00:28:07 +08:00

473 lines
14 KiB
Python

"""
核心发布管理器
统一的社交媒体发布管理器。
"""
import asyncio
from typing import Dict, List, Optional, Type, Union
from datetime import datetime
from .models import (
PlatformType,
AccountInfo,
Content,
PublishTask,
PublishResult,
UploadStatus,
PublishStatus
)
from ..platforms.base_adapter import BaseAdapter
from ..platforms.xiaohongshu.xiaohongshu_adapter import XiaoHongShuAdapter
from ..platforms.douyin.douyin_adapter import DouyinAdapter
from ..session_manager import SessionManager
from ..utils.browser import browser_manager
from ..utils.logger import get_logger, get_task_logger
from ..utils.exceptions import (
SocialMediaError,
PlatformNotSupportedError,
ValidationError,
AuthenticationError
)
logger = get_logger(__name__)
class Publisher:
"""统一的发布管理器"""
def __init__(self, headless: bool = False, debug: bool = False):
"""
初始化发布管理器
Args:
headless: 是否使用无头模式
debug: 是否开启调试模式
"""
self.headless = headless
self.debug = debug
self.session_manager = SessionManager()
self.adapters: Dict[PlatformType, BaseAdapter] = {}
self._lock = asyncio.Lock()
# 初始化适配器
self._initialize_adapters()
def _initialize_adapters(self):
"""初始化平台适配器"""
self.adapters = {
PlatformType.XIAOHONGSHU: XiaoHongShuAdapter(),
PlatformType.DOUYIN: DouyinAdapter()
}
logger.info(f"已初始化 {len(self.adapters)} 个平台适配器")
async def setup_platform(self, platform: Union[str, PlatformType], account_name: str) -> bool:
"""
设置平台账号
Args:
platform: 平台名称或类型
account_name: 账号名称
Returns:
设置是否成功
"""
try:
if isinstance(platform, str):
platform = PlatformType(platform.lower())
if platform not in self.adapters:
raise PlatformNotSupportedError(f"不支持的平台: {platform}")
adapter = self.adapters[platform]
account_info = AccountInfo(
platform=platform,
username=account_name,
cookie_file=f"{account_name}.json"
)
logger.info(f"设置平台账号: {platform.value}/{account_name}")
# 执行登录
success = await adapter.login(account_info, headless=self.headless)
if success:
# 保存会话
await self.session_manager.save_session(platform, account_name)
logger.success(f"平台设置成功: {platform.value}/{account_name}")
else:
logger.error(f"平台设置失败: {platform.value}/{account_name}")
return success
except Exception as e:
logger.error(f"平台设置异常: {e}")
return False
async def publish(
self,
platform: Union[str, PlatformType],
content: Content,
account_name: str
) -> PublishResult:
"""
发布内容到指定平台
Args:
platform: 平台名称或类型
content: 要发布的内容
account_name: 账号名称
Returns:
发布结果
"""
task_id = f"{platform}_{account_name}_{int(datetime.now().timestamp())}"
task_logger = get_task_logger(task_id, str(platform), account_name)
start_time = asyncio.get_event_loop().time()
try:
# 转换平台类型
if isinstance(platform, str):
platform = PlatformType(platform.lower())
# 验证平台支持
if platform not in self.adapters:
raise PlatformNotSupportedError(f"不支持的平台: {platform}")
adapter = self.adapters[platform]
account_info = AccountInfo(
platform=platform,
username=account_name,
cookie_file=f"{account_name}.json"
)
task_logger.start(f"开始发布任务: {content.title}")
# 获取或创建浏览器会话
page = await self.session_manager.get_session(platform, account_name, headless=self.headless)
# 验证登录状态
if not await adapter.check_login_status(page):
task_logger.warning("登录状态失效,尝试重新登录")
login_success = await adapter.login(account_info, headless=self.headless)
if not login_success:
raise AuthenticationError(f"登录失败: {platform.value}/{account_name}")
# 重新获取页面
page = await self.session_manager.get_session(platform, account_name, headless=self.headless)
# 发布内容
result = await adapter.publish_content(page, content, account_info)
# 更新任务信息
result.task_id = task_id
# 计算耗时
duration = asyncio.get_event_loop().time() - start_time
result.duration = duration
if result.success:
task_logger.success(f"任务完成,耗时: {duration:.2f}")
else:
task_logger.failure(f"任务失败: {result.message}")
return result
except Exception as e:
duration = asyncio.get_event_loop().time() - start_time
error_msg = f"发布任务异常: {str(e)}"
task_logger.failure(error_msg)
return PublishResult(
task_id=task_id,
platform=platform if isinstance(platform, PlatformType) else PlatformType(platform),
account=account_name,
success=False,
message=error_msg,
error_details={"exception": str(e), "type": type(e).__name__},
duration=duration
)
async def batch_publish(self, tasks: List[PublishTask]) -> List[PublishResult]:
"""
批量发布内容
Args:
tasks: 发布任务列表
Returns:
发布结果列表
"""
if not tasks:
return []
logger.info(f"开始批量发布任务: {len(tasks)} 个任务")
results = []
# 使用信号量控制并发数
semaphore = asyncio.Semaphore(3) # 最多同时执行3个任务
async def _publish_task(task: PublishTask) -> PublishResult:
async with semaphore:
try:
# 转换为发布方法调用
result = await self.publish(
platform=task.platform,
content=task.content,
account_name=task.account.username
)
# 如果失败且可以重试
if not result.success and task.should_retry:
logger.warning(f"任务失败,准备重试: {task.id} (第{task.retry_count + 1}次)")
task.increment_retry()
await asyncio.sleep(2 ** task.retry_count) # 指数退避
return await _publish_task(task)
return result
except Exception as e:
logger.error(f"批量发布任务异常: {task.id} - {e}")
return PublishResult(
task_id=task.id,
platform=task.platform,
account=task.account.username,
success=False,
message=f"任务执行异常: {str(e)}",
error_details={"exception": str(e)}
)
# 并发执行所有任务
results = await asyncio.gather(
*[_publish_task(task) for task in tasks],
return_exceptions=True
)
# 处理异常结果
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
processed_results.append(PublishResult(
task_id=tasks[i].id,
platform=tasks[i].platform,
account=tasks[i].account.username,
success=False,
message=f"任务执行异常: {str(result)}",
error_details={"exception": str(result)}
))
else:
processed_results.append(result)
# 统计结果
success_count = sum(1 for r in processed_results if r.success)
logger.info(f"批量发布完成: 成功 {success_count}/{len(processed_results)}")
return processed_results
async def test_login(self, platform: Union[str, PlatformType], account_name: str) -> bool:
"""
测试登录状态
Args:
platform: 平台名称或类型
account_name: 账号名称
Returns:
登录是否有效
"""
try:
if isinstance(platform, str):
platform = PlatformType(platform.lower())
if platform not in self.adapters:
raise PlatformNotSupportedError(f"不支持的平台: {platform}")
adapter = self.adapters[platform]
account_info = AccountInfo(
platform=platform,
username=account_name,
cookie_file=f"{account_name}.json"
)
return await adapter.get_authenticator().test_login(account_info)
except Exception as e:
logger.error(f"登录状态测试失败: {e}")
return False
async def get_supported_platforms(self) -> List[PlatformType]:
"""
获取支持的平台列表
Returns:
支持的平台类型列表
"""
return list(self.adapters.keys())
async def validate_content(
self,
platform: Union[str, PlatformType],
content: Content
) -> tuple[bool, str]:
"""
验证内容是否符合平台要求
Args:
platform: 平台名称或类型
content: 要验证的内容
Returns:
(是否有效, 错误信息)
"""
try:
if isinstance(platform, str):
platform = PlatformType(platform.lower())
if platform not in self.adapters:
raise PlatformNotSupportedError(f"不支持的平台: {platform}")
adapter = self.adapters[platform]
return await adapter.validate_content(content)
except Exception as e:
return False, f"内容验证异常: {str(e)}"
async def add_custom_adapter(self, platform: PlatformType, adapter: BaseAdapter):
"""
添加自定义适配器
Args:
platform: 平台类型
adapter: 适配器实例
"""
self.adapters[platform] = adapter
logger.info(f"已添加自定义适配器: {platform.value}")
async def remove_adapter(self, platform: PlatformType):
"""
移除平台适配器
Args:
platform: 平台类型
"""
if platform in self.adapters:
del self.adapters[platform]
logger.info(f"已移除适配器: {platform.value}")
async def get_adapter(self, platform: PlatformType) -> Optional[BaseAdapter]:
"""
获取平台适配器
Args:
platform: 平台类型
Returns:
适配器实例或None
"""
return self.adapters.get(platform)
async def cleanup(self):
"""清理资源"""
try:
async with self._lock:
await self.session_manager.cleanup()
await browser_manager.cleanup()
logger.info("发布管理器资源清理完成")
except Exception as e:
logger.error(f"资源清理失败: {e}")
async def __aenter__(self):
"""异步上下文管理器入口"""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""异步上下文管理器出口"""
await self.cleanup()
# 便捷函数
async def create_publisher(headless: bool = False, debug: bool = False) -> Publisher:
"""
创建发布器实例
Args:
headless: 是否使用无头模式
debug: 是否开启调试模式
Returns:
发布器实例
"""
return Publisher(headless=headless, debug=debug)
async def publish_to_xiaohongshu(
title: str,
description: str,
images: List[str],
tags: Optional[List[str]] = None,
account_name: str = "default",
headless: bool = False
) -> PublishResult:
"""
快速发布小红书图文笔记
Args:
title: 标题
description: 描述
images: 图片路径列表
tags: 标签列表
account_name: 账号名称
headless: 是否使用无头模式
Returns:
发布结果
"""
from .models import ImageNote
content = ImageNote(
title=title,
description=description,
images=images,
tags=tags or []
)
async with Publisher(headless=headless) as publisher:
return await publisher.publish(PlatformType.XIAOHONGSHU, content, account_name)
async def publish_to_douyin(
title: str,
description: str,
video_path: str,
tags: Optional[List[str]] = None,
account_name: str = "default",
headless: bool = False
) -> PublishResult:
"""
快速发布抖音视频
Args:
title: 标题
description: 描述
video_path: 视频文件路径
tags: 标签列表
account_name: 账号名称
headless: 是否使用无头模式
Returns:
发布结果
"""
from .models import VideoContent
content = VideoContent(
title=title,
description=description,
video_path=video_path,
tags=tags or []
)
async with Publisher(headless=headless) as publisher:
return await publisher.publish(PlatformType.DOUYIN, content, account_name)