460 lines
14 KiB
Python
460 lines
14 KiB
Python
|
|
"""
|
|||
|
|
Cookie管理器
|
|||
|
|
统一的Cookie存储、加载、验证和管理功能。
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import json
|
|||
|
|
import os
|
|||
|
|
from pathlib import Path
|
|||
|
|
from typing import Dict, List, Any, Optional
|
|||
|
|
from datetime import datetime, timedelta
|
|||
|
|
from cryptography.fernet import Fernet
|
|||
|
|
import base64
|
|||
|
|
|
|||
|
|
from ..core.models import PlatformType, AccountInfo
|
|||
|
|
from ..config.settings import settings
|
|||
|
|
from ..utils.logger import get_logger
|
|||
|
|
|
|||
|
|
logger = get_logger(__name__)
|
|||
|
|
|
|||
|
|
|
|||
|
|
class CookieManager:
|
|||
|
|
"""Cookie管理器"""
|
|||
|
|
|
|||
|
|
def __init__(self):
|
|||
|
|
self.cookie_dir = settings.cookies_dir
|
|||
|
|
self.cookie_dir.mkdir(parents=True, exist_ok=True)
|
|||
|
|
self._encryption_key = None
|
|||
|
|
self._fernet = None
|
|||
|
|
|
|||
|
|
# 初始化加密
|
|||
|
|
if settings.security.encrypt_cookies:
|
|||
|
|
self._init_encryption()
|
|||
|
|
|
|||
|
|
def _init_encryption(self):
|
|||
|
|
"""初始化加密功能"""
|
|||
|
|
try:
|
|||
|
|
key_file = self.cookie_dir / ".encryption_key"
|
|||
|
|
|
|||
|
|
if key_file.exists():
|
|||
|
|
# 读取现有密钥
|
|||
|
|
with open(key_file, 'rb') as f:
|
|||
|
|
self._encryption_key = f.read()
|
|||
|
|
else:
|
|||
|
|
# 生成新密钥
|
|||
|
|
self._encryption_key = Fernet.generate_key()
|
|||
|
|
with open(key_file, 'wb') as f:
|
|||
|
|
f.write(self._encryption_key)
|
|||
|
|
# 设置文件权限为仅所有者可读写
|
|||
|
|
os.chmod(key_file, 0o600)
|
|||
|
|
|
|||
|
|
self._fernet = Fernet(self._encryption_key)
|
|||
|
|
logger.debug("Cookie加密初始化成功")
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"Cookie加密初始化失败: {e}")
|
|||
|
|
settings.security.encrypt_cookies = False
|
|||
|
|
|
|||
|
|
def _encrypt_data(self, data: str) -> str:
|
|||
|
|
"""加密数据"""
|
|||
|
|
if not self._fernet:
|
|||
|
|
return data
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
encrypted = self._fernet.encrypt(data.encode('utf-8'))
|
|||
|
|
return base64.b64encode(encrypted).decode('utf-8')
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.warning(f"数据加密失败: {e}")
|
|||
|
|
return data
|
|||
|
|
|
|||
|
|
def _decrypt_data(self, encrypted_data: str) -> str:
|
|||
|
|
"""解密数据"""
|
|||
|
|
if not self._fernet:
|
|||
|
|
return encrypted_data
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
encrypted_bytes = base64.b64decode(encrypted_data.encode('utf-8'))
|
|||
|
|
decrypted = self._fernet.decrypt(encrypted_bytes)
|
|||
|
|
return decrypted.decode('utf-8')
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.warning(f"数据解密失败: {e}")
|
|||
|
|
return encrypted_data
|
|||
|
|
|
|||
|
|
def get_cookie_file_path(self, platform: PlatformType, account_name: str) -> Path:
|
|||
|
|
"""
|
|||
|
|
获取Cookie文件路径
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
platform: 平台类型
|
|||
|
|
account_name: 账号名称
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
Cookie文件路径
|
|||
|
|
"""
|
|||
|
|
platform_dir = self.cookie_dir / platform.value
|
|||
|
|
platform_dir.mkdir(parents=True, exist_ok=True)
|
|||
|
|
return platform_dir / f"{account_name}.json"
|
|||
|
|
|
|||
|
|
async def save_cookies(
|
|||
|
|
self,
|
|||
|
|
platform: PlatformType,
|
|||
|
|
account_name: str,
|
|||
|
|
cookies: List[Dict[str, Any]],
|
|||
|
|
metadata: Optional[Dict[str, Any]] = None
|
|||
|
|
) -> bool:
|
|||
|
|
"""
|
|||
|
|
保存Cookie
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
platform: 平台类型
|
|||
|
|
account_name: 账号名称
|
|||
|
|
cookies: Cookie列表
|
|||
|
|
metadata: 元数据
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
保存是否成功
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
cookie_file = self.get_cookie_file_path(platform, account_name)
|
|||
|
|
|
|||
|
|
# 准备Cookie数据
|
|||
|
|
cookie_data = {
|
|||
|
|
"cookies": cookies,
|
|||
|
|
"metadata": {
|
|||
|
|
"platform": platform.value,
|
|||
|
|
"account": account_name,
|
|||
|
|
"saved_at": datetime.now().isoformat(),
|
|||
|
|
"expires_at": (datetime.now() + timedelta(days=settings.security.cookie_expiry_days)).isoformat(),
|
|||
|
|
**(metadata or {})
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
# 序列化为JSON
|
|||
|
|
json_data = json.dumps(cookie_data, indent=2, ensure_ascii=False)
|
|||
|
|
|
|||
|
|
# 加密(如果启用)
|
|||
|
|
if settings.security.encrypt_cookies:
|
|||
|
|
json_data = self._encrypt_data(json_data)
|
|||
|
|
# 保存为加密文件
|
|||
|
|
encrypted_file = cookie_file.with_suffix('.enc')
|
|||
|
|
with open(encrypted_file, 'w', encoding='utf-8') as f:
|
|||
|
|
f.write(json_data)
|
|||
|
|
|
|||
|
|
# 删除未加密文件
|
|||
|
|
if cookie_file.exists():
|
|||
|
|
cookie_file.unlink()
|
|||
|
|
else:
|
|||
|
|
# 直接保存未加密文件
|
|||
|
|
with open(cookie_file, 'w', encoding='utf-8') as f:
|
|||
|
|
f.write(json_data)
|
|||
|
|
|
|||
|
|
logger.info(f"Cookie保存成功: {platform.value}/{account_name}")
|
|||
|
|
return True
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"Cookie保存失败: {e}")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
async def load_cookies(
|
|||
|
|
self,
|
|||
|
|
platform: PlatformType,
|
|||
|
|
account_name: str
|
|||
|
|
) -> Optional[List[Dict[str, Any]]]:
|
|||
|
|
"""
|
|||
|
|
加载Cookie
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
platform: 平台类型
|
|||
|
|
account_name: 账号名称
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
Cookie列表,如果加载失败返回None
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
cookie_file = self.get_cookie_file_path(platform, account_name)
|
|||
|
|
encrypted_file = cookie_file.with_suffix('.enc')
|
|||
|
|
|
|||
|
|
# 确定读取哪个文件
|
|||
|
|
file_to_read = None
|
|||
|
|
if encrypted_file.exists():
|
|||
|
|
file_to_read = encrypted_file
|
|||
|
|
elif cookie_file.exists():
|
|||
|
|
file_to_read = cookie_file
|
|||
|
|
|
|||
|
|
if not file_to_read:
|
|||
|
|
logger.debug(f"Cookie文件不存在: {platform.value}/{account_name}")
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
# 读取文件
|
|||
|
|
with open(file_to_read, 'r', encoding='utf-8') as f:
|
|||
|
|
json_data = f.read()
|
|||
|
|
|
|||
|
|
# 解密(如果是加密文件)
|
|||
|
|
if file_to_read == encrypted_file:
|
|||
|
|
json_data = self._decrypt_data(json_data)
|
|||
|
|
|
|||
|
|
# 解析JSON
|
|||
|
|
cookie_data = json.loads(json_data)
|
|||
|
|
|
|||
|
|
# 验证数据格式
|
|||
|
|
if "cookies" not in cookie_data:
|
|||
|
|
logger.error("Cookie文件格式错误")
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
cookies = cookie_data["cookies"]
|
|||
|
|
|
|||
|
|
# 检查是否过期
|
|||
|
|
metadata = cookie_data.get("metadata", {})
|
|||
|
|
expires_at = metadata.get("expires_at")
|
|||
|
|
if expires_at:
|
|||
|
|
try:
|
|||
|
|
expiry_time = datetime.fromisoformat(expires_at.replace('Z', '+00:00'))
|
|||
|
|
if expiry_time < datetime.now():
|
|||
|
|
logger.warning(f"Cookie已过期: {platform.value}/{account_name}")
|
|||
|
|
# 删除过期Cookie
|
|||
|
|
self.delete_cookies(platform, account_name)
|
|||
|
|
return None
|
|||
|
|
except:
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
logger.debug(f"Cookie加载成功: {platform.value}/{account_name}")
|
|||
|
|
return cookies
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"Cookie加载失败: {e}")
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
def delete_cookies(self, platform: PlatformType, account_name: str) -> bool:
|
|||
|
|
"""
|
|||
|
|
删除Cookie
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
platform: 平台类型
|
|||
|
|
account_name: 账号名称
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
删除是否成功
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
cookie_file = self.get_cookie_file_path(platform, account_name)
|
|||
|
|
encrypted_file = cookie_file.with_suffix('.enc')
|
|||
|
|
|
|||
|
|
deleted = False
|
|||
|
|
if cookie_file.exists():
|
|||
|
|
cookie_file.unlink()
|
|||
|
|
deleted = True
|
|||
|
|
|
|||
|
|
if encrypted_file.exists():
|
|||
|
|
encrypted_file.unlink()
|
|||
|
|
deleted = True
|
|||
|
|
|
|||
|
|
if deleted:
|
|||
|
|
logger.info(f"Cookie删除成功: {platform.value}/{account_name}")
|
|||
|
|
|
|||
|
|
return deleted
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"Cookie删除失败: {e}")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
def list_accounts(self, platform: PlatformType) -> List[str]:
|
|||
|
|
"""
|
|||
|
|
列出指定平台的所有账号
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
platform: 平台类型
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
账号名称列表
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
platform_dir = self.cookie_dir / platform.value
|
|||
|
|
if not platform_dir.exists():
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
accounts = []
|
|||
|
|
for file_path in platform_dir.glob("*.json"):
|
|||
|
|
# 跳过加密密钥文件
|
|||
|
|
if file_path.name.startswith('.'):
|
|||
|
|
continue
|
|||
|
|
accounts.append(file_path.stem)
|
|||
|
|
|
|||
|
|
# 检查加密文件
|
|||
|
|
for file_path in platform_dir.glob("*.enc"):
|
|||
|
|
if file_path.name.startswith('.'):
|
|||
|
|
continue
|
|||
|
|
accounts.append(file_path.stem)
|
|||
|
|
|
|||
|
|
return list(set(accounts)) # 去重
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"获取账号列表失败: {e}")
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
async def validate_cookies(
|
|||
|
|
self,
|
|||
|
|
platform: PlatformType,
|
|||
|
|
account_name: str
|
|||
|
|
) -> bool:
|
|||
|
|
"""
|
|||
|
|
验证Cookie是否有效
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
platform: 平台类型
|
|||
|
|
account_name: 账号名称
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
Cookie是否有效
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
cookies = await self.load_cookies(platform, account_name)
|
|||
|
|
if not cookies:
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
# 基本验证
|
|||
|
|
if not isinstance(cookies, list) or len(cookies) == 0:
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
# 检查关键Cookie
|
|||
|
|
important_cookies = self._get_important_cookies(platform)
|
|||
|
|
cookie_names = [c.get('name', '') for c in cookies]
|
|||
|
|
|
|||
|
|
has_important_cookies = any(
|
|||
|
|
name in cookie_names for name in important_cookies
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
if not has_important_cookies:
|
|||
|
|
logger.warning(f"Cookie缺少关键字段: {platform.value}/{account_name}")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
return True
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"Cookie验证失败: {e}")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
def _get_important_cookies(self, platform: PlatformType) -> List[str]:
|
|||
|
|
"""
|
|||
|
|
获取平台的重要Cookie名称
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
platform: 平台类型
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
重要Cookie名称列表
|
|||
|
|
"""
|
|||
|
|
cookie_mapping = {
|
|||
|
|
PlatformType.XIAOHONGSHU: [
|
|||
|
|
'sessionid', 'token', 'userid', 'webId', 'a1', 'web_session',
|
|||
|
|
'webId', 'web_session', 'x-t', 'webId'
|
|||
|
|
],
|
|||
|
|
PlatformType.DOUYIN: [
|
|||
|
|
'sessionid', 'sid_guard', 'uid_tt', 'sid_tt', 'ttcid',
|
|||
|
|
'passport_csrf_token', '__ac_nonce', '__ac_signature',
|
|||
|
|
'MONITOR_WEB_ID'
|
|||
|
|
]
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return cookie_mapping.get(platform, [])
|
|||
|
|
|
|||
|
|
async def cleanup_expired_cookies(self) -> int:
|
|||
|
|
"""
|
|||
|
|
清理过期的Cookie
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
清理的Cookie数量
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
cleaned_count = 0
|
|||
|
|
|
|||
|
|
for platform in PlatformType:
|
|||
|
|
accounts = self.list_accounts(platform)
|
|||
|
|
for account in accounts:
|
|||
|
|
if not await self.validate_cookies(platform, account):
|
|||
|
|
self.delete_cookies(platform, account)
|
|||
|
|
cleaned_count += 1
|
|||
|
|
logger.info(f"清理过期Cookie: {platform.value}/{account}")
|
|||
|
|
|
|||
|
|
logger.info(f"Cookie清理完成,共清理 {cleaned_count} 个过期Cookie")
|
|||
|
|
return cleaned_count
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"Cookie清理失败: {e}")
|
|||
|
|
return 0
|
|||
|
|
|
|||
|
|
async def get_cookie_metadata(
|
|||
|
|
self,
|
|||
|
|
platform: PlatformType,
|
|||
|
|
account_name: str
|
|||
|
|
) -> Optional[Dict[str, Any]]:
|
|||
|
|
"""
|
|||
|
|
获取Cookie元数据
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
platform: 平台类型
|
|||
|
|
account_name: 账号名称
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
Cookie元数据
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
cookie_file = self.get_cookie_file_path(platform, account_name)
|
|||
|
|
encrypted_file = cookie_file.with_suffix('.enc')
|
|||
|
|
|
|||
|
|
file_to_read = None
|
|||
|
|
if encrypted_file.exists():
|
|||
|
|
file_to_read = encrypted_file
|
|||
|
|
elif cookie_file.exists():
|
|||
|
|
file_to_read = cookie_file
|
|||
|
|
|
|||
|
|
if not file_to_read:
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
with open(file_to_read, 'r', encoding='utf-8') as f:
|
|||
|
|
json_data = f.read()
|
|||
|
|
|
|||
|
|
if file_to_read == encrypted_file:
|
|||
|
|
json_data = self._decrypt_data(json_data)
|
|||
|
|
|
|||
|
|
cookie_data = json.loads(json_data)
|
|||
|
|
return cookie_data.get("metadata", {})
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"获取Cookie元数据失败: {e}")
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
async def update_cookie_metadata(
|
|||
|
|
self,
|
|||
|
|
platform: PlatformType,
|
|||
|
|
account_name: str,
|
|||
|
|
metadata: Dict[str, Any]
|
|||
|
|
) -> bool:
|
|||
|
|
"""
|
|||
|
|
更新Cookie元数据
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
platform: 平台类型
|
|||
|
|
account_name: 账号名称
|
|||
|
|
metadata: 新的元数据
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
更新是否成功
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
cookies = await self.load_cookies(platform, account_name)
|
|||
|
|
if not cookies:
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
current_metadata = await self.get_cookie_metadata(platform, account_name) or {}
|
|||
|
|
updated_metadata = {**current_metadata, **metadata}
|
|||
|
|
updated_metadata['updated_at'] = datetime.now().isoformat()
|
|||
|
|
|
|||
|
|
return await self.save_cookies(platform, account_name, cookies, updated_metadata)
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"更新Cookie元数据失败: {e}")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 全局Cookie管理器实例
|
|||
|
|
cookie_manager = CookieManager()
|