autoUpload/examples/upload_images_to_xiaohongshu.py

264 lines
9.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
# 小红书图文上传脚本 - 智能适配单图和多图
import sys
import os
from pathlib import Path
# 获取当前脚本所在目录
current_dir = os.path.dirname(os.path.abspath(__file__))
# 项目根目录是当前目录的上一级因为examples目录和conf.py同级
project_root = os.path.dirname(current_dir)
# 将项目根目录添加到系统路径
sys.path.append(project_root)
import asyncio
import re
from datetime import datetime, timedelta
from conf import BASE_DIR
from uploader.xiaohongshu_uploader.main import XiaoHongShuImage, xiaohongshu_setup
from utils.files_times import generate_schedule_time_next_day
def get_image_groups_from_folder(images_folder):
"""
从文件夹中智能获取图片组
支持两种方式:
1. 单独的图片文件(每个图片一个图文)
2. 以数字结尾的图片组旅行1.jpg, 旅行2.jpg, 旅行3.jpg -> 一个图文包含3张图
"""
images_folder = Path(images_folder)
if not images_folder.exists():
print(f"图片文件夹不存在: {images_folder}")
return []
# 获取所有图片文件
image_extensions = ['.jpg', '.jpeg', '.png', '.webp', '.bmp']
all_images_set = set() # 使用集合去重
for ext in image_extensions:
# 搜索小写扩展名
for img in images_folder.glob(f"*{ext}"):
all_images_set.add(img.resolve()) # 使用绝对路径去重
# 搜索大写扩展名
for img in images_folder.glob(f"*{ext.upper()}"):
all_images_set.add(img.resolve()) # 使用绝对路径去重
all_images = list(all_images_set) # 转换回列表
if not all_images:
print(f"{images_folder} 中未找到图片文件")
return []
# 按文件名分组
image_groups = {}
for image_path in all_images:
filename = image_path.stem # 不包含扩展名的文件名
# 检查文件名是否以数字结尾旅行1, 美食2
match = re.match(r'^(.+?)(\d+)$', filename)
if match:
# 有数字后缀,按基础名称分组
base_name = match.group(1)
number = int(match.group(2))
if base_name not in image_groups:
image_groups[base_name] = []
image_groups[base_name].append((number, image_path))
else:
# 没有数字后缀,单独成组
if filename not in image_groups:
image_groups[filename] = []
image_groups[filename].append((1, image_path))
# 整理分组结果
result_groups = []
for base_name, images in image_groups.items():
# 按数字排序
images.sort(key=lambda x: x[0])
image_paths = [img[1] for img in images]
# 判断是单图还是多图
if len(image_paths) == 1:
print(f"发现单图: {base_name} - {image_paths[0].name}")
else:
print(f"发现多图组: {base_name} - {len(image_paths)} 张图片")
for i, path in enumerate(image_paths, 1):
print(f" {i}. {path.name}")
result_groups.append({
'base_name': base_name,
'image_paths': image_paths,
'count': len(image_paths),
'type': 'multi' if len(image_paths) > 1 else 'single'
})
return result_groups
def get_image_metadata(base_name, images_folder):
"""
根据基础名称获取图文元数据
查找对应的txt文件旅行.txt 对应 旅行1.jpg, 旅行2.jpg 或单独的 旅行.jpg
"""
txt_file = Path(images_folder) / f"{base_name}.txt"
if txt_file.exists():
with open(txt_file, 'r', encoding='utf-8') as f:
lines = f.readlines()
# 第一行:标题
title = lines[0].strip() if len(lines) >= 1 else base_name
# 第二行:标签
if len(lines) >= 2:
tags_line = lines[1].strip()
# 智能识别标签格式
if tags_line.startswith('#'):
# 空格分隔格式:#美食 #甜品 #生活
tags = []
for tag in tags_line.split():
tag = tag.strip()
if tag and tag.startswith('#'):
tag_content = tag[1:].strip() # 移除#号
if tag_content:
tags.append(tag_content)
else:
# 逗号分隔格式:美食,甜品,生活 或 美食,甜品,生活
tags = [tag.strip() for tag in tags_line.replace('', ',').split(',') if tag.strip()]
else:
tags = ['生活', '分享']
# 第三行:地点(可选)
location = None
if len(lines) >= 3:
location_line = lines[2].strip()
if location_line: # 只有非空才设置地点
location = location_line
# 第四行及以后:正文内容(可选)
content = None
if len(lines) >= 4:
# 从第四行开始的所有内容作为正文
content_lines = [line.rstrip() for line in lines[3:]]
# 移除开头和结尾的空行
while content_lines and not content_lines[0]:
content_lines.pop(0)
while content_lines and not content_lines[-1]:
content_lines.pop()
if content_lines:
content = '\n'.join(content_lines)
else:
# 没有对应的txt文件使用默认值
title = base_name
tags = ['生活', '分享']
location = None
content = None
return title, tags, location, content
if __name__ == '__main__':
print("🎯 小红书图文上传工具 - 智能适配单图/多图")
print("=" * 50)
# 配置
images_folder = Path(BASE_DIR) / "images"
account_file = Path(BASE_DIR / "cookies" / "xiaohongshu_uploader" / "account.json")
# 检查文件夹和账号文件
if not images_folder.exists():
print(f"❌ 图片文件夹不存在: {images_folder}")
print("请创建images文件夹并放入要上传的图片文件")
exit(1)
if not account_file.exists():
print("❌ 账号文件不存在,请先运行 get_xiaohongshu_cookie.py 获取登录凭证")
exit(1)
# 智能获取图片分组
print("🔍 正在扫描图片文件...")
image_groups = get_image_groups_from_folder(images_folder)
if not image_groups:
print("❌ 未找到任何图片文件")
print("支持的格式: jpg, jpeg, png, webp, bmp")
exit(1)
# 统计信息
total_groups = len(image_groups)
single_count = sum(1 for group in image_groups if group['type'] == 'single')
multi_count = sum(1 for group in image_groups if group['type'] == 'multi')
total_images = sum(group['count'] for group in image_groups)
print(f"\n📊 扫描结果:")
print(f" • 总图文数: {total_groups}")
print(f" • 单图图文: {single_count}")
print(f" • 多图图文: {multi_count}")
print(f" • 总图片数: {total_images}")
# 生成定时发布时间每天下午4点发布1个图文
print(f"\n⏰ 生成发布时间表...")
publish_datetimes = generate_schedule_time_next_day(total_groups, 1, daily_times=[16])
# 检查cookie
print("🔐 验证登录状态...")
cookie_setup = asyncio.run(xiaohongshu_setup(account_file, handle=False))
if not cookie_setup:
print("❌ Cookie验证失败请先运行 get_xiaohongshu_cookie.py 获取登录凭证")
exit(1)
print("✅ 登录状态验证成功")
# 逐个上传图文组
print(f"\n🚀 开始上传图文...")
print("=" * 50)
for index, group in enumerate(image_groups):
try:
base_name = group['base_name']
image_paths = group['image_paths']
image_count = group['count']
group_type = group['type']
# 获取图文信息
title, tags, location, content = get_image_metadata(base_name, images_folder)
print(f"\n📝 第 {index + 1}/{total_groups} 个图文")
print(f" 类型: {'🖼️ 单图' if group_type == 'single' else '🖼️ ×' + str(image_count) + ' 多图'}")
print(f" 名称: {base_name}")
print(f" 标题: {title}")
print(f" 标签: {', '.join(tags)}")
print(f" 地点: {location if location else '未设置'}")
print(f" 正文: {len(content) if content else 0} 字符")
print(f" 发布: {publish_datetimes[index]}")
# 创建图文上传实例(自动适配单图/多图)
app = XiaoHongShuImage(
title=title,
image_paths=[str(path) for path in image_paths], # 自动适配单张或多张图片
tags=tags,
publish_date=publish_datetimes[index],
account_file=account_file,
location=location,
content=content,
headless=False
)
# 执行上传
print(f" 🔄 正在上传...")
asyncio.run(app.main(), debug=False)
type_desc = f"单图" if group_type == 'single' else f"{image_count}张图"
print(f" ✅ 图文《{title}》({type_desc}) 上传完成")
except Exception as e:
print(f" ❌ 上传图文组 {base_name} 时出错: {e}")
continue
print(f"\n🎉 所有图文上传完成!")
print(f"📊 处理结果: {total_groups} 个图文组,{total_images} 张图片")
print("=" * 50)