2025-11-12 00:28:07 +08:00

390 lines
11 KiB
Python

"""
基础认证类
定义了所有平台认证的通用接口和方法。
"""
import json
import asyncio
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Optional, Dict, Any, List
from datetime import datetime, timedelta
from playwright.async_api import Page, BrowserContext
from ..core.models import PlatformType, AccountInfo
from ..config.settings import settings
from ..config.platform_config import get_platform_url, get_selectors, get_wait_times
from ..utils.browser import browser_manager
from ..utils.logger import get_platform_logger
from ..utils.exceptions import (
LoginFailedError,
AuthenticationError,
TimeoutError,
CookieExpiredError,
ElementNotFoundError
)
logger = get_platform_logger("auth")
class BaseAuth(ABC):
"""基础认证抽象类"""
def __init__(self, platform: PlatformType):
self.platform = platform
self.platform_name = platform.value
self.logger = get_platform_logger(self.platform_name)
self.cookie_dir = settings.cookies_dir / self.platform_name
self.cookie_dir.mkdir(parents=True, exist_ok=True)
@abstractmethod
async def login(self, account_info: AccountInfo, headless: bool = False) -> bool:
"""
执行登录流程
Args:
account_info: 账号信息
headless: 是否使用无头模式
Returns:
登录是否成功
"""
pass
@abstractmethod
async def check_login_status(self, page: Page) -> bool:
"""
检查登录状态
Args:
page: Playwright页面对象
Returns:
是否已登录
"""
pass
async def load_cookies(self, page: Page, account_info: AccountInfo) -> bool:
"""
加载Cookie到页面
Args:
page: Playwright页面对象
account_info: 账号信息
Returns:
Cookie加载是否成功
"""
try:
cookie_file = self.cookie_dir / account_info.cookie_file
if not cookie_file.exists():
self.logger.info(f"Cookie文件不存在: {cookie_file}")
return False
with open(cookie_file, 'r', encoding='utf-8') as f:
cookies = json.load(f)
if not cookies:
self.logger.warning("Cookie文件为空")
return False
# 检查Cookie是否过期
if self._is_cookies_expired(cookies):
self.logger.warning("Cookie已过期")
return False
# 过滤有效的Cookie
valid_cookies = [c for c in cookies if self._is_valid_cookie(c)]
if not valid_cookies:
self.logger.warning("没有有效的Cookie")
return False
await page.context.add_cookies(valid_cookies)
self.logger.info(f"Cookie加载成功: {len(valid_cookies)}")
return True
except Exception as e:
self.logger.error(f"Cookie加载失败: {e}")
return False
async def save_cookies(self, page: Page, account_info: AccountInfo) -> bool:
"""
保存Cookie到文件
Args:
page: Playwright页面对象
account_info: 账号信息
Returns:
Cookie保存是否成功
"""
try:
cookies = await page.context.cookies()
if not cookies:
self.logger.warning("没有获取到Cookie")
return False
# 添加过期时间和平台信息
for cookie in cookies:
cookie['platform'] = self.platform_name
cookie['saved_at'] = datetime.now().isoformat()
if 'expires' not in cookie:
# 设置默认过期时间为30天后
cookie['expires'] = (datetime.now() + timedelta(days=30)).timestamp()
cookie_file = self.cookie_dir / account_info.cookie_file
with open(cookie_file, 'w', encoding='utf-8') as f:
json.dump(cookies, f, indent=2, ensure_ascii=False)
self.logger.info(f"Cookie保存成功: {cookie_file}")
return True
except Exception as e:
self.logger.error(f"Cookie保存失败: {e}")
return False
def _is_valid_cookie(self, cookie: Dict[str, Any]) -> bool:
"""
检查Cookie是否有效
Args:
cookie: Cookie对象
Returns:
Cookie是否有效
"""
# 基本字段检查
required_fields = ['name', 'value', 'domain']
if not all(field in cookie for field in required_fields):
return False
# 域名检查
domain = cookie.get('domain', '')
if not domain:
return False
# 过期检查
expires = cookie.get('expires')
if expires and expires > 0:
expiry_time = datetime.fromtimestamp(expires)
if expiry_time < datetime.now():
return False
return True
def _is_cookies_expired(self, cookies: List[Dict[str, Any]]) -> bool:
"""
检查Cookie集合是否过期
Args:
cookies: Cookie列表
Returns:
Cookie集合是否过期
"""
if not cookies:
return True
# 检查关键Cookie是否过期
for cookie in cookies:
if cookie.get('name') in ['sessionid', 'token', 'sid']:
expires = cookie.get('expires')
if expires and expires > 0:
expiry_time = datetime.fromtimestamp(expires)
if expiry_time < datetime.now():
return True
# 检查保存时间
saved_at = cookies[0].get('saved_at') if cookies else None
if saved_at:
try:
save_time = datetime.fromisoformat(saved_at.replace('Z', '+00:00'))
if save_time + timedelta(days=settings.security.cookie_expiry_days) < datetime.now():
return True
except:
pass
return False
async def create_page(self, account_info: AccountInfo, headless: bool = False) -> Page:
"""
创建页面实例
Args:
account_info: 账号信息
headless: 是否使用无头模式
Returns:
Playwright页面对象
"""
try:
browser = await browser_manager.get_browser(headless=headless)
context = await browser_manager.get_context(
browser=browser,
user_data_dir=account_info.user_data_dir
)
page = await browser_manager.get_page(context=context)
return page
except Exception as e:
raise AuthenticationError(f"页面创建失败: {e}", platform=self.platform_name)
async def wait_for_login_success(
self,
page: Page,
timeout: int = 300,
check_interval: float = 2.0
) -> bool:
"""
等待登录成功
Args:
page: Playwright页面对象
timeout: 超时时间(秒)
check_interval: 检查间隔(秒)
Returns:
登录是否成功
"""
start_time = asyncio.get_event_loop().time()
self.logger.info("等待用户扫码登录...")
while asyncio.get_event_loop().time() - start_time < timeout:
try:
if await self.check_login_status(page):
self.logger.info("登录成功!")
return True
await asyncio.sleep(check_interval)
except Exception as e:
self.logger.warning(f"登录状态检查失败: {e}")
await asyncio.sleep(check_interval)
raise TimeoutError(f"登录超时: {timeout}", timeout=timeout)
async def safe_goto(self, page: Page, url: str, timeout: int = 30000) -> bool:
"""
安全导航到指定URL
Args:
page: Playwright页面对象
url: 目标URL
timeout: 超时时间(毫秒)
Returns:
导航是否成功
"""
try:
await page.goto(url, timeout=timeout, wait_until="domcontentloaded")
await asyncio.sleep(2) # 等待页面加载
return True
except Exception as e:
self.logger.error(f"页面导航失败: {url} - {e}")
return False
async def wait_for_element(
self,
page: Page,
selector: str,
timeout: int = 10000
) -> bool:
"""
等待元素出现
Args:
page: Playwright页面对象
selector: 元素选择器
timeout: 超时时间(毫秒)
Returns:
元素是否出现
"""
try:
await page.wait_for_selector(selector, timeout=timeout)
return True
except:
return False
async def click_element_safely(self, page: Page, selector: str, timeout: int = 10000) -> bool:
"""
安全点击元素
Args:
page: Playwright页面对象
selector: 元素选择器
timeout: 超时时间(毫秒)
Returns:
点击是否成功
"""
try:
element = await page.wait_for_selector(selector, timeout=timeout)
if element:
await element.click()
return True
return False
except Exception as e:
self.logger.warning(f"元素点击失败: {selector} - {e}")
return False
def get_cookie_file_path(self, account_info: AccountInfo) -> Path:
"""
获取Cookie文件路径
Args:
account_info: 账号信息
Returns:
Cookie文件路径
"""
return self.cookie_dir / account_info.cookie_file
def delete_cookies(self, account_info: AccountInfo) -> bool:
"""
删除Cookie文件
Args:
account_info: 账号信息
Returns:
删除是否成功
"""
try:
cookie_file = self.get_cookie_file_path(account_info)
if cookie_file.exists():
cookie_file.unlink()
self.logger.info(f"Cookie文件删除成功: {cookie_file}")
return True
return True
except Exception as e:
self.logger.error(f"Cookie文件删除失败: {e}")
return False
async def validate_session(self, page: Page) -> bool:
"""
验证会话是否有效
Args:
page: Playwright页面对象
Returns:
会话是否有效
"""
try:
# 检查登录状态
if not await self.check_login_status(page):
return False
# 检查页面是否正常加载
current_url = page.url
if "login" in current_url.lower() or "auth" in current_url.lower():
return False
# 可以添加更多验证逻辑
return True
except Exception as e:
self.logger.warning(f"会话验证失败: {e}")
return False