2025-11-12 00:28:07 +08:00

460 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Cookie管理器
统一的Cookie存储、加载、验证和管理功能。
"""
import json
import os
from pathlib import Path
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
from cryptography.fernet import Fernet
import base64
from ..core.models import PlatformType, AccountInfo
from ..config.settings import settings
from ..utils.logger import get_logger
logger = get_logger(__name__)
class CookieManager:
"""Cookie管理器"""
def __init__(self):
self.cookie_dir = settings.cookies_dir
self.cookie_dir.mkdir(parents=True, exist_ok=True)
self._encryption_key = None
self._fernet = None
# 初始化加密
if settings.security.encrypt_cookies:
self._init_encryption()
def _init_encryption(self):
"""初始化加密功能"""
try:
key_file = self.cookie_dir / ".encryption_key"
if key_file.exists():
# 读取现有密钥
with open(key_file, 'rb') as f:
self._encryption_key = f.read()
else:
# 生成新密钥
self._encryption_key = Fernet.generate_key()
with open(key_file, 'wb') as f:
f.write(self._encryption_key)
# 设置文件权限为仅所有者可读写
os.chmod(key_file, 0o600)
self._fernet = Fernet(self._encryption_key)
logger.debug("Cookie加密初始化成功")
except Exception as e:
logger.error(f"Cookie加密初始化失败: {e}")
settings.security.encrypt_cookies = False
def _encrypt_data(self, data: str) -> str:
"""加密数据"""
if not self._fernet:
return data
try:
encrypted = self._fernet.encrypt(data.encode('utf-8'))
return base64.b64encode(encrypted).decode('utf-8')
except Exception as e:
logger.warning(f"数据加密失败: {e}")
return data
def _decrypt_data(self, encrypted_data: str) -> str:
"""解密数据"""
if not self._fernet:
return encrypted_data
try:
encrypted_bytes = base64.b64decode(encrypted_data.encode('utf-8'))
decrypted = self._fernet.decrypt(encrypted_bytes)
return decrypted.decode('utf-8')
except Exception as e:
logger.warning(f"数据解密失败: {e}")
return encrypted_data
def get_cookie_file_path(self, platform: PlatformType, account_name: str) -> Path:
"""
获取Cookie文件路径
Args:
platform: 平台类型
account_name: 账号名称
Returns:
Cookie文件路径
"""
platform_dir = self.cookie_dir / platform.value
platform_dir.mkdir(parents=True, exist_ok=True)
return platform_dir / f"{account_name}.json"
async def save_cookies(
self,
platform: PlatformType,
account_name: str,
cookies: List[Dict[str, Any]],
metadata: Optional[Dict[str, Any]] = None
) -> bool:
"""
保存Cookie
Args:
platform: 平台类型
account_name: 账号名称
cookies: Cookie列表
metadata: 元数据
Returns:
保存是否成功
"""
try:
cookie_file = self.get_cookie_file_path(platform, account_name)
# 准备Cookie数据
cookie_data = {
"cookies": cookies,
"metadata": {
"platform": platform.value,
"account": account_name,
"saved_at": datetime.now().isoformat(),
"expires_at": (datetime.now() + timedelta(days=settings.security.cookie_expiry_days)).isoformat(),
**(metadata or {})
}
}
# 序列化为JSON
json_data = json.dumps(cookie_data, indent=2, ensure_ascii=False)
# 加密(如果启用)
if settings.security.encrypt_cookies:
json_data = self._encrypt_data(json_data)
# 保存为加密文件
encrypted_file = cookie_file.with_suffix('.enc')
with open(encrypted_file, 'w', encoding='utf-8') as f:
f.write(json_data)
# 删除未加密文件
if cookie_file.exists():
cookie_file.unlink()
else:
# 直接保存未加密文件
with open(cookie_file, 'w', encoding='utf-8') as f:
f.write(json_data)
logger.info(f"Cookie保存成功: {platform.value}/{account_name}")
return True
except Exception as e:
logger.error(f"Cookie保存失败: {e}")
return False
async def load_cookies(
self,
platform: PlatformType,
account_name: str
) -> Optional[List[Dict[str, Any]]]:
"""
加载Cookie
Args:
platform: 平台类型
account_name: 账号名称
Returns:
Cookie列表如果加载失败返回None
"""
try:
cookie_file = self.get_cookie_file_path(platform, account_name)
encrypted_file = cookie_file.with_suffix('.enc')
# 确定读取哪个文件
file_to_read = None
if encrypted_file.exists():
file_to_read = encrypted_file
elif cookie_file.exists():
file_to_read = cookie_file
if not file_to_read:
logger.debug(f"Cookie文件不存在: {platform.value}/{account_name}")
return None
# 读取文件
with open(file_to_read, 'r', encoding='utf-8') as f:
json_data = f.read()
# 解密(如果是加密文件)
if file_to_read == encrypted_file:
json_data = self._decrypt_data(json_data)
# 解析JSON
cookie_data = json.loads(json_data)
# 验证数据格式
if "cookies" not in cookie_data:
logger.error("Cookie文件格式错误")
return None
cookies = cookie_data["cookies"]
# 检查是否过期
metadata = cookie_data.get("metadata", {})
expires_at = metadata.get("expires_at")
if expires_at:
try:
expiry_time = datetime.fromisoformat(expires_at.replace('Z', '+00:00'))
if expiry_time < datetime.now():
logger.warning(f"Cookie已过期: {platform.value}/{account_name}")
# 删除过期Cookie
self.delete_cookies(platform, account_name)
return None
except:
pass
logger.debug(f"Cookie加载成功: {platform.value}/{account_name}")
return cookies
except Exception as e:
logger.error(f"Cookie加载失败: {e}")
return None
def delete_cookies(self, platform: PlatformType, account_name: str) -> bool:
"""
删除Cookie
Args:
platform: 平台类型
account_name: 账号名称
Returns:
删除是否成功
"""
try:
cookie_file = self.get_cookie_file_path(platform, account_name)
encrypted_file = cookie_file.with_suffix('.enc')
deleted = False
if cookie_file.exists():
cookie_file.unlink()
deleted = True
if encrypted_file.exists():
encrypted_file.unlink()
deleted = True
if deleted:
logger.info(f"Cookie删除成功: {platform.value}/{account_name}")
return deleted
except Exception as e:
logger.error(f"Cookie删除失败: {e}")
return False
def list_accounts(self, platform: PlatformType) -> List[str]:
"""
列出指定平台的所有账号
Args:
platform: 平台类型
Returns:
账号名称列表
"""
try:
platform_dir = self.cookie_dir / platform.value
if not platform_dir.exists():
return []
accounts = []
for file_path in platform_dir.glob("*.json"):
# 跳过加密密钥文件
if file_path.name.startswith('.'):
continue
accounts.append(file_path.stem)
# 检查加密文件
for file_path in platform_dir.glob("*.enc"):
if file_path.name.startswith('.'):
continue
accounts.append(file_path.stem)
return list(set(accounts)) # 去重
except Exception as e:
logger.error(f"获取账号列表失败: {e}")
return []
async def validate_cookies(
self,
platform: PlatformType,
account_name: str
) -> bool:
"""
验证Cookie是否有效
Args:
platform: 平台类型
account_name: 账号名称
Returns:
Cookie是否有效
"""
try:
cookies = await self.load_cookies(platform, account_name)
if not cookies:
return False
# 基本验证
if not isinstance(cookies, list) or len(cookies) == 0:
return False
# 检查关键Cookie
important_cookies = self._get_important_cookies(platform)
cookie_names = [c.get('name', '') for c in cookies]
has_important_cookies = any(
name in cookie_names for name in important_cookies
)
if not has_important_cookies:
logger.warning(f"Cookie缺少关键字段: {platform.value}/{account_name}")
return False
return True
except Exception as e:
logger.error(f"Cookie验证失败: {e}")
return False
def _get_important_cookies(self, platform: PlatformType) -> List[str]:
"""
获取平台的重要Cookie名称
Args:
platform: 平台类型
Returns:
重要Cookie名称列表
"""
cookie_mapping = {
PlatformType.XIAOHONGSHU: [
'sessionid', 'token', 'userid', 'webId', 'a1', 'web_session',
'webId', 'web_session', 'x-t', 'webId'
],
PlatformType.DOUYIN: [
'sessionid', 'sid_guard', 'uid_tt', 'sid_tt', 'ttcid',
'passport_csrf_token', '__ac_nonce', '__ac_signature',
'MONITOR_WEB_ID'
]
}
return cookie_mapping.get(platform, [])
async def cleanup_expired_cookies(self) -> int:
"""
清理过期的Cookie
Returns:
清理的Cookie数量
"""
try:
cleaned_count = 0
for platform in PlatformType:
accounts = self.list_accounts(platform)
for account in accounts:
if not await self.validate_cookies(platform, account):
self.delete_cookies(platform, account)
cleaned_count += 1
logger.info(f"清理过期Cookie: {platform.value}/{account}")
logger.info(f"Cookie清理完成共清理 {cleaned_count} 个过期Cookie")
return cleaned_count
except Exception as e:
logger.error(f"Cookie清理失败: {e}")
return 0
async def get_cookie_metadata(
self,
platform: PlatformType,
account_name: str
) -> Optional[Dict[str, Any]]:
"""
获取Cookie元数据
Args:
platform: 平台类型
account_name: 账号名称
Returns:
Cookie元数据
"""
try:
cookie_file = self.get_cookie_file_path(platform, account_name)
encrypted_file = cookie_file.with_suffix('.enc')
file_to_read = None
if encrypted_file.exists():
file_to_read = encrypted_file
elif cookie_file.exists():
file_to_read = cookie_file
if not file_to_read:
return None
with open(file_to_read, 'r', encoding='utf-8') as f:
json_data = f.read()
if file_to_read == encrypted_file:
json_data = self._decrypt_data(json_data)
cookie_data = json.loads(json_data)
return cookie_data.get("metadata", {})
except Exception as e:
logger.error(f"获取Cookie元数据失败: {e}")
return None
async def update_cookie_metadata(
self,
platform: PlatformType,
account_name: str,
metadata: Dict[str, Any]
) -> bool:
"""
更新Cookie元数据
Args:
platform: 平台类型
account_name: 账号名称
metadata: 新的元数据
Returns:
更新是否成功
"""
try:
cookies = await self.load_cookies(platform, account_name)
if not cookies:
return False
current_metadata = await self.get_cookie_metadata(platform, account_name) or {}
updated_metadata = {**current_metadata, **metadata}
updated_metadata['updated_at'] = datetime.now().isoformat()
return await self.save_cookies(platform, account_name, cookies, updated_metadata)
except Exception as e:
logger.error(f"更新Cookie元数据失败: {e}")
return False
# 全局Cookie管理器实例
cookie_manager = CookieManager()