""" 核心发布管理器 统一的社交媒体发布管理器。 """ import asyncio from typing import Dict, List, Optional, Type, Union from datetime import datetime from .models import ( PlatformType, AccountInfo, Content, PublishTask, PublishResult, UploadStatus, PublishStatus ) from ..platforms.base_adapter import BaseAdapter from ..platforms.xiaohongshu.xiaohongshu_adapter import XiaoHongShuAdapter from ..platforms.douyin.douyin_adapter import DouyinAdapter from ..session_manager import SessionManager from ..utils.browser import browser_manager from ..utils.logger import get_logger, get_task_logger from ..utils.exceptions import ( SocialMediaError, PlatformNotSupportedError, ValidationError, AuthenticationError ) logger = get_logger(__name__) class Publisher: """统一的发布管理器""" def __init__(self, headless: bool = False, debug: bool = False): """ 初始化发布管理器 Args: headless: 是否使用无头模式 debug: 是否开启调试模式 """ self.headless = headless self.debug = debug self.session_manager = SessionManager() self.adapters: Dict[PlatformType, BaseAdapter] = {} self._lock = asyncio.Lock() # 初始化适配器 self._initialize_adapters() def _initialize_adapters(self): """初始化平台适配器""" self.adapters = { PlatformType.XIAOHONGSHU: XiaoHongShuAdapter(), PlatformType.DOUYIN: DouyinAdapter() } logger.info(f"已初始化 {len(self.adapters)} 个平台适配器") async def setup_platform(self, platform: Union[str, PlatformType], account_name: str) -> bool: """ 设置平台账号 Args: platform: 平台名称或类型 account_name: 账号名称 Returns: 设置是否成功 """ try: if isinstance(platform, str): platform = PlatformType(platform.lower()) if platform not in self.adapters: raise PlatformNotSupportedError(f"不支持的平台: {platform}") adapter = self.adapters[platform] account_info = AccountInfo( platform=platform, username=account_name, cookie_file=f"{account_name}.json" ) logger.info(f"设置平台账号: {platform.value}/{account_name}") # 执行登录 success = await adapter.login(account_info, headless=self.headless) if success: # 保存会话 await self.session_manager.save_session(platform, account_name) logger.success(f"平台设置成功: {platform.value}/{account_name}") else: logger.error(f"平台设置失败: {platform.value}/{account_name}") return success except Exception as e: logger.error(f"平台设置异常: {e}") return False async def publish( self, platform: Union[str, PlatformType], content: Content, account_name: str ) -> PublishResult: """ 发布内容到指定平台 Args: platform: 平台名称或类型 content: 要发布的内容 account_name: 账号名称 Returns: 发布结果 """ task_id = f"{platform}_{account_name}_{int(datetime.now().timestamp())}" task_logger = get_task_logger(task_id, str(platform), account_name) start_time = asyncio.get_event_loop().time() try: # 转换平台类型 if isinstance(platform, str): platform = PlatformType(platform.lower()) # 验证平台支持 if platform not in self.adapters: raise PlatformNotSupportedError(f"不支持的平台: {platform}") adapter = self.adapters[platform] account_info = AccountInfo( platform=platform, username=account_name, cookie_file=f"{account_name}.json" ) task_logger.start(f"开始发布任务: {content.title}") # 获取或创建浏览器会话 page = await self.session_manager.get_session(platform, account_name, headless=self.headless) # 验证登录状态 if not await adapter.check_login_status(page): task_logger.warning("登录状态失效,尝试重新登录") login_success = await adapter.login(account_info, headless=self.headless) if not login_success: raise AuthenticationError(f"登录失败: {platform.value}/{account_name}") # 重新获取页面 page = await self.session_manager.get_session(platform, account_name, headless=self.headless) # 发布内容 result = await adapter.publish_content(page, content, account_info) # 更新任务信息 result.task_id = task_id # 计算耗时 duration = asyncio.get_event_loop().time() - start_time result.duration = duration if result.success: task_logger.success(f"任务完成,耗时: {duration:.2f}秒") else: task_logger.failure(f"任务失败: {result.message}") return result except Exception as e: duration = asyncio.get_event_loop().time() - start_time error_msg = f"发布任务异常: {str(e)}" task_logger.failure(error_msg) return PublishResult( task_id=task_id, platform=platform if isinstance(platform, PlatformType) else PlatformType(platform), account=account_name, success=False, message=error_msg, error_details={"exception": str(e), "type": type(e).__name__}, duration=duration ) async def batch_publish(self, tasks: List[PublishTask]) -> List[PublishResult]: """ 批量发布内容 Args: tasks: 发布任务列表 Returns: 发布结果列表 """ if not tasks: return [] logger.info(f"开始批量发布任务: {len(tasks)} 个任务") results = [] # 使用信号量控制并发数 semaphore = asyncio.Semaphore(3) # 最多同时执行3个任务 async def _publish_task(task: PublishTask) -> PublishResult: async with semaphore: try: # 转换为发布方法调用 result = await self.publish( platform=task.platform, content=task.content, account_name=task.account.username ) # 如果失败且可以重试 if not result.success and task.should_retry: logger.warning(f"任务失败,准备重试: {task.id} (第{task.retry_count + 1}次)") task.increment_retry() await asyncio.sleep(2 ** task.retry_count) # 指数退避 return await _publish_task(task) return result except Exception as e: logger.error(f"批量发布任务异常: {task.id} - {e}") return PublishResult( task_id=task.id, platform=task.platform, account=task.account.username, success=False, message=f"任务执行异常: {str(e)}", error_details={"exception": str(e)} ) # 并发执行所有任务 results = await asyncio.gather( *[_publish_task(task) for task in tasks], return_exceptions=True ) # 处理异常结果 processed_results = [] for i, result in enumerate(results): if isinstance(result, Exception): processed_results.append(PublishResult( task_id=tasks[i].id, platform=tasks[i].platform, account=tasks[i].account.username, success=False, message=f"任务执行异常: {str(result)}", error_details={"exception": str(result)} )) else: processed_results.append(result) # 统计结果 success_count = sum(1 for r in processed_results if r.success) logger.info(f"批量发布完成: 成功 {success_count}/{len(processed_results)}") return processed_results async def test_login(self, platform: Union[str, PlatformType], account_name: str) -> bool: """ 测试登录状态 Args: platform: 平台名称或类型 account_name: 账号名称 Returns: 登录是否有效 """ try: if isinstance(platform, str): platform = PlatformType(platform.lower()) if platform not in self.adapters: raise PlatformNotSupportedError(f"不支持的平台: {platform}") adapter = self.adapters[platform] account_info = AccountInfo( platform=platform, username=account_name, cookie_file=f"{account_name}.json" ) return await adapter.get_authenticator().test_login(account_info) except Exception as e: logger.error(f"登录状态测试失败: {e}") return False async def get_supported_platforms(self) -> List[PlatformType]: """ 获取支持的平台列表 Returns: 支持的平台类型列表 """ return list(self.adapters.keys()) async def validate_content( self, platform: Union[str, PlatformType], content: Content ) -> tuple[bool, str]: """ 验证内容是否符合平台要求 Args: platform: 平台名称或类型 content: 要验证的内容 Returns: (是否有效, 错误信息) """ try: if isinstance(platform, str): platform = PlatformType(platform.lower()) if platform not in self.adapters: raise PlatformNotSupportedError(f"不支持的平台: {platform}") adapter = self.adapters[platform] return await adapter.validate_content(content) except Exception as e: return False, f"内容验证异常: {str(e)}" async def add_custom_adapter(self, platform: PlatformType, adapter: BaseAdapter): """ 添加自定义适配器 Args: platform: 平台类型 adapter: 适配器实例 """ self.adapters[platform] = adapter logger.info(f"已添加自定义适配器: {platform.value}") async def remove_adapter(self, platform: PlatformType): """ 移除平台适配器 Args: platform: 平台类型 """ if platform in self.adapters: del self.adapters[platform] logger.info(f"已移除适配器: {platform.value}") async def get_adapter(self, platform: PlatformType) -> Optional[BaseAdapter]: """ 获取平台适配器 Args: platform: 平台类型 Returns: 适配器实例或None """ return self.adapters.get(platform) async def cleanup(self): """清理资源""" try: async with self._lock: await self.session_manager.cleanup() await browser_manager.cleanup() logger.info("发布管理器资源清理完成") except Exception as e: logger.error(f"资源清理失败: {e}") async def __aenter__(self): """异步上下文管理器入口""" return self async def __aexit__(self, exc_type, exc_val, exc_tb): """异步上下文管理器出口""" await self.cleanup() # 便捷函数 async def create_publisher(headless: bool = False, debug: bool = False) -> Publisher: """ 创建发布器实例 Args: headless: 是否使用无头模式 debug: 是否开启调试模式 Returns: 发布器实例 """ return Publisher(headless=headless, debug=debug) async def publish_to_xiaohongshu( title: str, description: str, images: List[str], tags: Optional[List[str]] = None, account_name: str = "default", headless: bool = False ) -> PublishResult: """ 快速发布小红书图文笔记 Args: title: 标题 description: 描述 images: 图片路径列表 tags: 标签列表 account_name: 账号名称 headless: 是否使用无头模式 Returns: 发布结果 """ from .models import ImageNote content = ImageNote( title=title, description=description, images=images, tags=tags or [] ) async with Publisher(headless=headless) as publisher: return await publisher.publish(PlatformType.XIAOHONGSHU, content, account_name) async def publish_to_douyin( title: str, description: str, video_path: str, tags: Optional[List[str]] = None, account_name: str = "default", headless: bool = False ) -> PublishResult: """ 快速发布抖音视频 Args: title: 标题 description: 描述 video_path: 视频文件路径 tags: 标签列表 account_name: 账号名称 headless: 是否使用无头模式 Returns: 发布结果 """ from .models import VideoContent content = VideoContent( title=title, description=description, video_path=video_path, tags=tags or [] ) async with Publisher(headless=headless) as publisher: return await publisher.publish(PlatformType.DOUYIN, content, account_name)