bangbang-aigc-server/core/cookie_manager.py

251 lines
9.7 KiB
Python
Raw Permalink Normal View History

2025-07-31 15:35:23 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Cookie管理器
支持多个cookies的管理验证和轮换使用
"""
import json
import os
import time
import random
from typing import Dict, List, Optional, Tuple
from datetime import datetime, timedelta
import logging
from .models import CookieInfo, CookieStats
logger = logging.getLogger(__name__)
class CookieManager:
"""Cookie管理器"""
def __init__(self, config_path: str = "config/cookies.json"):
self.config_path = config_path
self.cookies: Dict[str, CookieInfo] = {}
self.current_cookie: Optional[str] = None
self.max_failures = 3 # 最大失败次数
self.rotation_interval = 300 # 轮换间隔(秒)
self.load_cookies()
def load_cookies(self) -> None:
"""加载cookies配置"""
try:
if os.path.exists(self.config_path):
with open(self.config_path, 'r', encoding='utf-8') as f:
data = json.load(f)
for cookie_data in data.get('cookies', []):
cookie_info = CookieInfo(
name=cookie_data['name'],
cookie_string=cookie_data['cookie_string'],
last_used=cookie_data.get('last_used', datetime.now().isoformat()),
use_count=cookie_data.get('use_count', 0),
is_valid=cookie_data.get('is_valid', True),
failure_count=cookie_data.get('failure_count', 0),
user_info=cookie_data.get('user_info')
)
self.cookies[cookie_info.name] = cookie_info
logger.info(f"加载了 {len(self.cookies)} 个cookies")
else:
logger.warning(f"Cookie配置文件不存在: {self.config_path}")
self._create_default_config()
except Exception as e:
logger.error(f"加载cookies失败: {str(e)}")
self._create_default_config()
def _create_default_config(self) -> None:
"""创建默认配置文件"""
os.makedirs(os.path.dirname(self.config_path), exist_ok=True)
default_config = {
"cookies": [
{
"name": "default",
"cookie_string": "请在此处填入您的小红书cookie",
"last_used": datetime.now().isoformat(),
"use_count": 0,
"is_valid": True,
"failure_count": 0,
"user_info": None
}
]
}
with open(self.config_path, 'w', encoding='utf-8') as f:
json.dump(default_config, f, ensure_ascii=False, indent=2)
logger.info(f"创建默认配置文件: {self.config_path}")
def save_cookies(self) -> None:
"""保存cookies配置"""
try:
cookies_data = []
for cookie_info in self.cookies.values():
cookies_data.append({
'name': cookie_info.name,
'cookie_string': cookie_info.cookie_string,
'last_used': cookie_info.last_used.isoformat(),
'use_count': cookie_info.use_count,
'is_valid': cookie_info.is_valid,
'failure_count': cookie_info.failure_count,
'user_info': cookie_info.user_info
})
config_data = {'cookies': cookies_data}
os.makedirs(os.path.dirname(self.config_path), exist_ok=True)
with open(self.config_path, 'w', encoding='utf-8') as f:
json.dump(config_data, f, ensure_ascii=False, indent=2)
logger.debug("保存cookies配置成功")
except Exception as e:
logger.error(f"保存cookies配置失败: {str(e)}")
def add_cookie(self, name: str, cookie_string: str, user_info: Optional[Dict] = None) -> bool:
"""添加新的cookie"""
if name in self.cookies:
logger.warning(f"Cookie名称 '{name}' 已存在")
return False
cookie_info = CookieInfo(
name=name,
cookie_string=cookie_string,
user_info=user_info
)
self.cookies[name] = cookie_info
self.save_cookies()
logger.info(f"添加新cookie: {name}")
return True
def remove_cookie(self, name: str) -> bool:
"""删除cookie"""
if name not in self.cookies:
logger.warning(f"Cookie名称 '{name}' 不存在")
return False
del self.cookies[name]
if self.current_cookie == name:
self.current_cookie = None
self.save_cookies()
logger.info(f"删除cookie: {name}")
return True
def get_valid_cookies(self) -> List[str]:
"""获取有效的cookie名称列表"""
return [name for name, cookie_info in self.cookies.items()
if cookie_info.is_valid and cookie_info.failure_count < self.max_failures]
def get_cookie(self, name: Optional[str] = None) -> Optional[CookieInfo]:
"""获取指定或最优的cookie"""
if name:
return self.cookies.get(name) if name in self.cookies else None
# 获取最优cookie
valid_cookies = self.get_valid_cookies()
if not valid_cookies:
logger.error("没有可用的cookies")
return None
# 选择策略优先使用失败次数少、使用次数少的cookie
best_cookie = None
best_score = float('inf')
for cookie_name in valid_cookies:
cookie_info = self.cookies[cookie_name]
# 计算评分:失败次数 * 10 + 使用次数 + 时间因子
time_factor = (datetime.now() - cookie_info.last_used).total_seconds() / 3600 # 小时
score = cookie_info.failure_count * 10 + cookie_info.use_count - time_factor
if score < best_score:
best_score = score
best_cookie = cookie_info
return best_cookie
def get_cookie_string(self, name: Optional[str] = None) -> Optional[str]:
"""获取cookie字符串"""
cookie_info = self.get_cookie(name)
if cookie_info:
# 更新使用信息
cookie_info.last_used = datetime.now()
cookie_info.use_count += 1
self.current_cookie = cookie_info.name
self.save_cookies()
return cookie_info.cookie_string
return None
def mark_cookie_failure(self, name: Optional[str] = None) -> None:
"""标记cookie使用失败"""
cookie_name = name or self.current_cookie
if cookie_name and cookie_name in self.cookies:
cookie_info = self.cookies[cookie_name]
cookie_info.failure_count += 1
if cookie_info.failure_count >= self.max_failures:
cookie_info.is_valid = False
logger.warning(f"Cookie '{cookie_name}' 已失效")
self.save_cookies()
def mark_cookie_success(self, name: Optional[str] = None) -> None:
"""标记cookie使用成功"""
cookie_name = name or self.current_cookie
if cookie_name and cookie_name in self.cookies:
cookie_info = self.cookies[cookie_name]
cookie_info.failure_count = 0 # 重置失败次数
cookie_info.is_valid = True
self.save_cookies()
def get_random_cookie(self) -> Optional[CookieInfo]:
"""随机获取一个有效的cookie"""
valid_cookies = self.get_valid_cookies()
if not valid_cookies:
return None
cookie_name = random.choice(valid_cookies)
return self.cookies[cookie_name]
def get_statistics(self) -> CookieStats:
"""获取cookie使用统计"""
total_cookies = len(self.cookies)
valid_cookies = len(self.get_valid_cookies())
invalid_cookies = total_cookies - valid_cookies
cookie_details = []
for name, cookie_info in self.cookies.items():
cookie_details.append({
'name': name,
'is_valid': cookie_info.is_valid,
'use_count': cookie_info.use_count,
'failure_count': cookie_info.failure_count,
'last_used': cookie_info.last_used.isoformat()
})
return CookieStats(
total_cookies=total_cookies,
valid_cookies=valid_cookies,
invalid_cookies=invalid_cookies,
current_cookie=self.current_cookie,
cookie_details=cookie_details
)
def refresh_cookie_validity(self) -> None:
"""刷新cookie有效性重置失败次数"""
for cookie_info in self.cookies.values():
if cookie_info.failure_count < self.max_failures:
cookie_info.is_valid = True
cookie_info.failure_count = 0
self.save_cookies()
logger.info("已刷新所有cookie的有效性")
def rotate_cookies(self) -> None:
"""轮换cookies可定期调用"""
if len(self.get_valid_cookies()) <= 1:
return
# 如果当前cookie使用时间过长切换到其他cookie
if self.current_cookie:
current_cookie_info = self.cookies.get(self.current_cookie)
if current_cookie_info:
time_since_last_use = (datetime.now() - current_cookie_info.last_used).total_seconds()
if time_since_last_use > self.rotation_interval:
self.current_cookie = None
logger.info("Cookie轮换切换到其他cookie")