autoUpload/examples/upload_images_to_xiaohongshu.py

339 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
# 小红书图文上传脚本 - 智能适配单图和多图
import sys
import os
from pathlib import Path
# 获取当前脚本所在目录
current_dir = os.path.dirname(os.path.abspath(__file__))
# 项目根目录是当前目录的上一级因为examples目录和conf.py同级
project_root = os.path.dirname(current_dir)
# 将项目根目录添加到系统路径
sys.path.append(project_root)
import asyncio
import re
from datetime import datetime, timedelta
import random
import aiofiles
from conf import BASE_DIR
from uploader.xiaohongshu_uploader.main import XiaoHongShuImage, xiaohongshu_setup
from utils.files_times import generate_schedule_time_next_day
from utils.human_typing_wrapper import HumanTypingWrapper
from utils.enhanced_human_typing import EnhancedHumanTypingSimulator
from utils.network import async_retry
# 增强版人类输入配置
ENHANCED_CONFIG = {
# 人类状态模拟
'energy_level': random.uniform(0.7, 1.0),
'typing_proficiency': random.uniform(0.6, 0.9),
'emotion_state': random.uniform(0.8, 1.0),
# 错误处理
'base_error_rate': random.uniform(0.02, 0.05),
'error_correction_speed': random.uniform(0.3, 0.8),
# 速度控制
'speed_variance': random.uniform(0.1, 0.2),
'burst_speed_probability': 0.1
}
# 保留原有配置作为备用
HUMAN_CONFIG = {
'min_typing_speed': 5,
'max_typing_speed': 15,
'pause_probability': 0.1,
'chunk_input': True,
'max_chunk_length': 100,
'fatigue_effect': False
}
class XiaohongshuImageUploader:
def __init__(self, page=None, use_enhanced=True):
if page:
if use_enhanced:
# 使用增强版输入模拟器
self.human_typer = EnhancedHumanTypingSimulator(page)
self.use_enhanced = True
else:
# 使用原版输入模拟器作为备用
self.human_typer = HumanTypingWrapper(page, HUMAN_CONFIG)
self.use_enhanced = False
@async_retry(timeout=60, max_retries=3)
async def fill_form(self, selector, text, clear_first=True):
"""增强版人性化填写表单"""
try:
if self.use_enhanced:
# 使用增强版输入方法
success = await self.human_typer.type_text(text, selector)
if success:
await asyncio.sleep(random.uniform(0.5, 1.0))
return True
return False
else:
# 使用原版输入方法作为备用
await self.human_typer.type_text_human(selector, text, clear_first)
await asyncio.sleep(random.uniform(0.5, 1.0))
return True
except Exception as e:
print(f"填写表单失败: {e}")
return False
@async_retry(timeout=30, max_retries=3)
async def process_tags(self, tags_line):
"""处理标签,使用增强版输入"""
tags = []
if tags_line.startswith('#'):
for tag in tags_line.split():
tag = tag.strip()
if tag and tag.startswith('#'):
tag_content = tag[1:].strip()
if tag_content:
tags.append(tag_content)
# 更自然的处理间隔
await asyncio.sleep(random.uniform(0.2, 0.5))
return tags
def get_image_groups_from_folder(images_folder):
"""从文件夹中智能获取图片组"""
try:
images_folder = Path(images_folder)
if not images_folder.exists():
print(f"图片文件夹不存在: {images_folder}")
return []
# 获取所有图片文件
image_extensions = ['.jpg', '.jpeg', '.png', '.webp', '.bmp']
all_images_set = set()
for ext in image_extensions:
for img in images_folder.glob(f"*{ext.lower()}"):
all_images_set.add(img.resolve())
for img in images_folder.glob(f"*{ext.upper()}"):
all_images_set.add(img.resolve())
all_images = list(all_images_set)
if not all_images:
print(f"{images_folder} 中未找到图片文件")
return []
# 按文件名分组
image_groups = {}
for image_path in all_images:
filename = image_path.stem
match = re.match(r'^(.+?)(\d+)$', filename)
if match:
base_name = match.group(1)
number = int(match.group(2))
if base_name not in image_groups:
image_groups[base_name] = []
image_groups[base_name].append((number, image_path))
else:
if filename not in image_groups:
image_groups[filename] = []
image_groups[filename].append((1, image_path))
# 整理分组结果
result_groups = []
for base_name, images in image_groups.items():
images.sort(key=lambda x: x[0])
image_paths = [img[1] for img in images]
if len(image_paths) == 1:
print(f"发现单图: {base_name} - {image_paths[0].name}")
else:
print(f"发现多图组: {base_name} - {len(image_paths)} 张图片")
for i, path in enumerate(image_paths, 1):
print(f" {i}. {path.name}")
result_groups.append({
'base_name': base_name,
'image_paths': image_paths,
'count': len(image_paths),
'type': 'multi' if len(image_paths) > 1 else 'single'
})
return result_groups
except Exception as e:
print(f"处理图片文件夹时出错: {e}")
return []
@async_retry(timeout=30, max_retries=3)
async def get_image_metadata(base_name, images_folder):
"""获取图文元数据,添加重试机制"""
try:
txt_file = Path(images_folder) / f"{base_name}.txt"
if txt_file.exists():
async with aiofiles.open(txt_file, 'r', encoding='utf-8') as f:
lines = await f.readlines()
title = lines[0].strip() if len(lines) >= 1 else base_name
# 处理标签
tags = []
if len(lines) >= 2:
tags_line = lines[1].strip()
if tags_line.startswith('#'):
for tag in tags_line.split():
tag = tag.strip()
if tag and tag.startswith('#'):
tag_content = tag[1:].strip()
if tag_content:
tags.append(tag_content)
else:
tags = [tag.strip() for tag in tags_line.replace('', ',').split(',') if tag.strip()]
else:
tags = ['生活', '分享']
# 处理地点
location = None
if len(lines) >= 3:
location_line = lines[2].strip()
if location_line:
location = location_line
# 处理正文
content = None
if len(lines) >= 4:
content_lines = [line.rstrip() for line in lines[3:]]
while content_lines and not content_lines[0]:
content_lines.pop(0)
while content_lines and not content_lines[-1]:
content_lines.pop()
if content_lines:
content = '\n'.join(content_lines)
else:
title = base_name
tags = ['生活', '分享']
location = None
content = None
return title, tags, location, content
except Exception as e:
print(f"读取元数据失败: {e}")
return base_name, ['生活', '分享'], None, None
async def main():
print("🎯 小红书图文上传工具 - 智能适配单图/多图")
print("=" * 50)
try:
# 配置
images_folder = Path(BASE_DIR) / "images"
account_file = Path(BASE_DIR / "cookies" / "xiaohongshu_uploader" / "account.json")
# 检查文件夹和账号文件
if not images_folder.exists():
print(f"❌ 图片文件夹不存在: {images_folder}")
print("请创建images文件夹并放入要上传的图片文件")
return
if not account_file.exists():
print("❌ 账号文件不存在,请先运行 get_xiaohongshu_cookie.py 获取登录凭证")
return
# 智能获取图片分组
print("🔍 正在扫描图片文件...")
image_groups = get_image_groups_from_folder(images_folder)
if not image_groups:
print("❌ 未找到任何图片文件")
print("支持的格式: jpg, jpeg, png, webp, bmp")
return
# 统计信息
total_groups = len(image_groups)
single_count = sum(1 for group in image_groups if group['type'] == 'single')
multi_count = sum(1 for group in image_groups if group['type'] == 'multi')
total_images = sum(group['count'] for group in image_groups)
print(f"\n📊 扫描结果:")
print(f" • 总图文数: {total_groups}")
print(f" • 单图图文: {single_count}")
print(f" • 多图图文: {multi_count}")
print(f" • 总图片数: {total_images}")
# 生成定时发布时间
print(f"\n⏰ 生成发布时间表...")
publish_datetimes = generate_schedule_time_next_day(total_groups, 1, daily_times=[16])
# 检查cookie
print("🔐 验证登录状态...")
cookie_setup = await xiaohongshu_setup(account_file, handle=False)
if not cookie_setup:
print("❌ Cookie验证失败请先运行 get_xiaohongshu_cookie.py 获取登录凭证")
return
print("✅ 登录状态验证成功")
# 创建上传器实例(使用增强版输入模拟器)
uploader = XiaohongshuImageUploader(use_enhanced=True)
# 逐个上传图文组
print(f"\n🚀 开始上传图文...")
print("=" * 50)
for index, group in enumerate(image_groups):
try:
base_name = group['base_name']
image_paths = group['image_paths']
image_count = group['count']
group_type = group['type']
# 获取图文信息
title, tags, location, content = await get_image_metadata(base_name, images_folder)
print(f"\n📝 第 {index + 1}/{total_groups} 个图文")
print(f" 类型: {'🖼️ 单图' if group_type == 'single' else '🖼️ ×' + str(image_count) + ' 多图'}")
print(f" 名称: {base_name}")
print(f" 标题: {title}")
print(f" 标签: {', '.join(tags)}")
print(f" 地点: {location if location else '未设置'}")
print(f" 正文: {len(content) if content else 0} 字符")
print(f" 发布: {publish_datetimes[index]}")
# 创建图文上传实例(启用增强版输入)
app = XiaoHongShuImage(
title=title,
image_paths=[str(path) for path in image_paths],
tags=tags,
publish_date=publish_datetimes[index],
account_file=account_file,
location=location,
content=content,
headless=False,
use_enhanced_typing=True # 启用增强版输入
)
# 执行上传
print(f" 🔄 正在上传...")
await app.main()
type_desc = f"单图" if group_type == 'single' else f"{image_count}张图"
print(f" ✅ 图文《{title}》({type_desc}) 上传完成")
# 添加随机延迟,避免频繁上传
await asyncio.sleep(random.uniform(3, 5))
except Exception as e:
print(f" ❌ 上传图文组 {base_name} 时出错: {e}")
# 出错后等待较长时间再继续
await asyncio.sleep(random.uniform(5, 10))
continue
print(f"\n🎉 所有图文上传完成!")
print(f"📊 处理结果: {total_groups} 个图文组,{total_images} 张图片")
print("=" * 50)
except Exception as e:
print(f"程序执行出错: {e}")
if __name__ == '__main__':
asyncio.run(main())