251 lines
9.7 KiB
Python
251 lines
9.7 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
"""
|
||
Cookie管理器
|
||
支持多个cookies的管理、验证和轮换使用
|
||
"""
|
||
|
||
import json
|
||
import os
|
||
import time
|
||
import random
|
||
from typing import Dict, List, Optional, Tuple
|
||
from datetime import datetime, timedelta
|
||
import logging
|
||
|
||
from .models import CookieInfo, CookieStats
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
class CookieManager:
|
||
"""Cookie管理器"""
|
||
|
||
def __init__(self, config_path: str = "config/cookies.json"):
|
||
self.config_path = config_path
|
||
self.cookies: Dict[str, CookieInfo] = {}
|
||
self.current_cookie: Optional[str] = None
|
||
self.max_failures = 3 # 最大失败次数
|
||
self.rotation_interval = 300 # 轮换间隔(秒)
|
||
self.load_cookies()
|
||
|
||
def load_cookies(self) -> None:
|
||
"""加载cookies配置"""
|
||
try:
|
||
if os.path.exists(self.config_path):
|
||
with open(self.config_path, 'r', encoding='utf-8') as f:
|
||
data = json.load(f)
|
||
|
||
for cookie_data in data.get('cookies', []):
|
||
cookie_info = CookieInfo(
|
||
name=cookie_data['name'],
|
||
cookie_string=cookie_data['cookie_string'],
|
||
last_used=cookie_data.get('last_used', datetime.now().isoformat()),
|
||
use_count=cookie_data.get('use_count', 0),
|
||
is_valid=cookie_data.get('is_valid', True),
|
||
failure_count=cookie_data.get('failure_count', 0),
|
||
user_info=cookie_data.get('user_info')
|
||
)
|
||
self.cookies[cookie_info.name] = cookie_info
|
||
|
||
logger.info(f"加载了 {len(self.cookies)} 个cookies")
|
||
else:
|
||
logger.warning(f"Cookie配置文件不存在: {self.config_path}")
|
||
self._create_default_config()
|
||
except Exception as e:
|
||
logger.error(f"加载cookies失败: {str(e)}")
|
||
self._create_default_config()
|
||
|
||
def _create_default_config(self) -> None:
|
||
"""创建默认配置文件"""
|
||
os.makedirs(os.path.dirname(self.config_path), exist_ok=True)
|
||
default_config = {
|
||
"cookies": [
|
||
{
|
||
"name": "default",
|
||
"cookie_string": "请在此处填入您的小红书cookie",
|
||
"last_used": datetime.now().isoformat(),
|
||
"use_count": 0,
|
||
"is_valid": True,
|
||
"failure_count": 0,
|
||
"user_info": None
|
||
}
|
||
]
|
||
}
|
||
with open(self.config_path, 'w', encoding='utf-8') as f:
|
||
json.dump(default_config, f, ensure_ascii=False, indent=2)
|
||
logger.info(f"创建默认配置文件: {self.config_path}")
|
||
|
||
def save_cookies(self) -> None:
|
||
"""保存cookies配置"""
|
||
try:
|
||
cookies_data = []
|
||
for cookie_info in self.cookies.values():
|
||
cookies_data.append({
|
||
'name': cookie_info.name,
|
||
'cookie_string': cookie_info.cookie_string,
|
||
'last_used': cookie_info.last_used.isoformat(),
|
||
'use_count': cookie_info.use_count,
|
||
'is_valid': cookie_info.is_valid,
|
||
'failure_count': cookie_info.failure_count,
|
||
'user_info': cookie_info.user_info
|
||
})
|
||
|
||
config_data = {'cookies': cookies_data}
|
||
os.makedirs(os.path.dirname(self.config_path), exist_ok=True)
|
||
with open(self.config_path, 'w', encoding='utf-8') as f:
|
||
json.dump(config_data, f, ensure_ascii=False, indent=2)
|
||
logger.debug("保存cookies配置成功")
|
||
except Exception as e:
|
||
logger.error(f"保存cookies配置失败: {str(e)}")
|
||
|
||
def add_cookie(self, name: str, cookie_string: str, user_info: Optional[Dict] = None) -> bool:
|
||
"""添加新的cookie"""
|
||
if name in self.cookies:
|
||
logger.warning(f"Cookie名称 '{name}' 已存在")
|
||
return False
|
||
|
||
cookie_info = CookieInfo(
|
||
name=name,
|
||
cookie_string=cookie_string,
|
||
user_info=user_info
|
||
)
|
||
self.cookies[name] = cookie_info
|
||
self.save_cookies()
|
||
logger.info(f"添加新cookie: {name}")
|
||
return True
|
||
|
||
def remove_cookie(self, name: str) -> bool:
|
||
"""删除cookie"""
|
||
if name not in self.cookies:
|
||
logger.warning(f"Cookie名称 '{name}' 不存在")
|
||
return False
|
||
|
||
del self.cookies[name]
|
||
if self.current_cookie == name:
|
||
self.current_cookie = None
|
||
self.save_cookies()
|
||
logger.info(f"删除cookie: {name}")
|
||
return True
|
||
|
||
def get_valid_cookies(self) -> List[str]:
|
||
"""获取有效的cookie名称列表"""
|
||
return [name for name, cookie_info in self.cookies.items()
|
||
if cookie_info.is_valid and cookie_info.failure_count < self.max_failures]
|
||
|
||
def get_cookie(self, name: Optional[str] = None) -> Optional[CookieInfo]:
|
||
"""获取指定或最优的cookie"""
|
||
if name:
|
||
return self.cookies.get(name) if name in self.cookies else None
|
||
|
||
# 获取最优cookie
|
||
valid_cookies = self.get_valid_cookies()
|
||
if not valid_cookies:
|
||
logger.error("没有可用的cookies")
|
||
return None
|
||
|
||
# 选择策略:优先使用失败次数少、使用次数少的cookie
|
||
best_cookie = None
|
||
best_score = float('inf')
|
||
|
||
for cookie_name in valid_cookies:
|
||
cookie_info = self.cookies[cookie_name]
|
||
# 计算评分:失败次数 * 10 + 使用次数 + 时间因子
|
||
time_factor = (datetime.now() - cookie_info.last_used).total_seconds() / 3600 # 小时
|
||
score = cookie_info.failure_count * 10 + cookie_info.use_count - time_factor
|
||
|
||
if score < best_score:
|
||
best_score = score
|
||
best_cookie = cookie_info
|
||
|
||
return best_cookie
|
||
|
||
def get_cookie_string(self, name: Optional[str] = None) -> Optional[str]:
|
||
"""获取cookie字符串"""
|
||
cookie_info = self.get_cookie(name)
|
||
if cookie_info:
|
||
# 更新使用信息
|
||
cookie_info.last_used = datetime.now()
|
||
cookie_info.use_count += 1
|
||
self.current_cookie = cookie_info.name
|
||
self.save_cookies()
|
||
return cookie_info.cookie_string
|
||
return None
|
||
|
||
def mark_cookie_failure(self, name: Optional[str] = None) -> None:
|
||
"""标记cookie使用失败"""
|
||
cookie_name = name or self.current_cookie
|
||
if cookie_name and cookie_name in self.cookies:
|
||
cookie_info = self.cookies[cookie_name]
|
||
cookie_info.failure_count += 1
|
||
|
||
if cookie_info.failure_count >= self.max_failures:
|
||
cookie_info.is_valid = False
|
||
logger.warning(f"Cookie '{cookie_name}' 已失效")
|
||
|
||
self.save_cookies()
|
||
|
||
def mark_cookie_success(self, name: Optional[str] = None) -> None:
|
||
"""标记cookie使用成功"""
|
||
cookie_name = name or self.current_cookie
|
||
if cookie_name and cookie_name in self.cookies:
|
||
cookie_info = self.cookies[cookie_name]
|
||
cookie_info.failure_count = 0 # 重置失败次数
|
||
cookie_info.is_valid = True
|
||
self.save_cookies()
|
||
|
||
def get_random_cookie(self) -> Optional[CookieInfo]:
|
||
"""随机获取一个有效的cookie"""
|
||
valid_cookies = self.get_valid_cookies()
|
||
if not valid_cookies:
|
||
return None
|
||
|
||
cookie_name = random.choice(valid_cookies)
|
||
return self.cookies[cookie_name]
|
||
|
||
def get_statistics(self) -> CookieStats:
|
||
"""获取cookie使用统计"""
|
||
total_cookies = len(self.cookies)
|
||
valid_cookies = len(self.get_valid_cookies())
|
||
invalid_cookies = total_cookies - valid_cookies
|
||
|
||
cookie_details = []
|
||
for name, cookie_info in self.cookies.items():
|
||
cookie_details.append({
|
||
'name': name,
|
||
'is_valid': cookie_info.is_valid,
|
||
'use_count': cookie_info.use_count,
|
||
'failure_count': cookie_info.failure_count,
|
||
'last_used': cookie_info.last_used.isoformat()
|
||
})
|
||
|
||
return CookieStats(
|
||
total_cookies=total_cookies,
|
||
valid_cookies=valid_cookies,
|
||
invalid_cookies=invalid_cookies,
|
||
current_cookie=self.current_cookie,
|
||
cookie_details=cookie_details
|
||
)
|
||
|
||
def refresh_cookie_validity(self) -> None:
|
||
"""刷新cookie有效性(重置失败次数)"""
|
||
for cookie_info in self.cookies.values():
|
||
if cookie_info.failure_count < self.max_failures:
|
||
cookie_info.is_valid = True
|
||
cookie_info.failure_count = 0
|
||
self.save_cookies()
|
||
logger.info("已刷新所有cookie的有效性")
|
||
|
||
def rotate_cookies(self) -> None:
|
||
"""轮换cookies(可定期调用)"""
|
||
if len(self.get_valid_cookies()) <= 1:
|
||
return
|
||
|
||
# 如果当前cookie使用时间过长,切换到其他cookie
|
||
if self.current_cookie:
|
||
current_cookie_info = self.cookies.get(self.current_cookie)
|
||
if current_cookie_info:
|
||
time_since_last_use = (datetime.now() - current_cookie_info.last_used).total_seconds()
|
||
if time_since_last_use > self.rotation_interval:
|
||
self.current_cookie = None
|
||
logger.info("Cookie轮换:切换到其他cookie") |