# -*- coding: utf-8 -*- # 小红书图文上传脚本 - 智能适配单图和多图 import sys import os from pathlib import Path # 获取当前脚本所在目录 current_dir = os.path.dirname(os.path.abspath(__file__)) # 项目根目录是当前目录的上一级(因为examples目录和conf.py同级) project_root = os.path.dirname(current_dir) # 将项目根目录添加到系统路径 sys.path.append(project_root) import asyncio import re from datetime import datetime, timedelta import random import aiofiles from conf import BASE_DIR from uploader.xiaohongshu_uploader.main import XiaoHongShuImage, xiaohongshu_setup from utils.files_times import generate_schedule_time_next_day from utils.human_typing_wrapper import HumanTypingWrapper from utils.enhanced_human_typing import EnhancedHumanTypingSimulator from utils.network import async_retry # 增强版人类输入配置 ENHANCED_CONFIG = { # 人类状态模拟 'energy_level': random.uniform(0.7, 1.0), 'typing_proficiency': random.uniform(0.6, 0.9), 'emotion_state': random.uniform(0.8, 1.0), # 错误处理 'base_error_rate': random.uniform(0.02, 0.05), 'error_correction_speed': random.uniform(0.3, 0.8), # 速度控制 'speed_variance': random.uniform(0.1, 0.2), 'burst_speed_probability': 0.1 } # 保留原有配置作为备用 HUMAN_CONFIG = { 'min_typing_speed': 5, 'max_typing_speed': 15, 'pause_probability': 0.1, 'chunk_input': True, 'max_chunk_length': 100, 'fatigue_effect': False } class XiaohongshuImageUploader: def __init__(self, page=None, use_enhanced=True): if page: if use_enhanced: # 使用增强版输入模拟器 self.human_typer = EnhancedHumanTypingSimulator(page) self.use_enhanced = True else: # 使用原版输入模拟器作为备用 self.human_typer = HumanTypingWrapper(page, HUMAN_CONFIG) self.use_enhanced = False @async_retry(timeout=60, max_retries=3) async def fill_form(self, selector, text, clear_first=True): """增强版人性化填写表单""" try: if self.use_enhanced: # 使用增强版输入方法 success = await self.human_typer.type_text(text, selector) if success: await asyncio.sleep(random.uniform(0.5, 1.0)) return True return False else: # 使用原版输入方法作为备用 await self.human_typer.type_text_human(selector, text, clear_first) await asyncio.sleep(random.uniform(0.5, 1.0)) return True except Exception as e: print(f"填写表单失败: {e}") return False @async_retry(timeout=30, max_retries=3) async def process_tags(self, tags_line): """处理标签,使用增强版输入""" tags = [] if tags_line.startswith('#'): for tag in tags_line.split(): tag = tag.strip() if tag and tag.startswith('#'): tag_content = tag[1:].strip() if tag_content: tags.append(tag_content) # 更自然的处理间隔 await asyncio.sleep(random.uniform(0.2, 0.5)) return tags def get_image_groups_from_folder(images_folder): """从文件夹中智能获取图片组""" try: images_folder = Path(images_folder) if not images_folder.exists(): print(f"图片文件夹不存在: {images_folder}") return [] # 获取所有图片文件 image_extensions = ['.jpg', '.jpeg', '.png', '.webp', '.bmp'] all_images_set = set() for ext in image_extensions: for img in images_folder.glob(f"*{ext.lower()}"): all_images_set.add(img.resolve()) for img in images_folder.glob(f"*{ext.upper()}"): all_images_set.add(img.resolve()) all_images = list(all_images_set) if not all_images: print(f"在 {images_folder} 中未找到图片文件") return [] # 按文件名分组 image_groups = {} for image_path in all_images: filename = image_path.stem match = re.match(r'^(.+?)(\d+)$', filename) if match: base_name = match.group(1) number = int(match.group(2)) if base_name not in image_groups: image_groups[base_name] = [] image_groups[base_name].append((number, image_path)) else: if filename not in image_groups: image_groups[filename] = [] image_groups[filename].append((1, image_path)) # 整理分组结果 result_groups = [] for base_name, images in image_groups.items(): images.sort(key=lambda x: x[0]) image_paths = [img[1] for img in images] if len(image_paths) == 1: print(f"发现单图: {base_name} - {image_paths[0].name}") else: print(f"发现多图组: {base_name} - {len(image_paths)} 张图片") for i, path in enumerate(image_paths, 1): print(f" {i}. {path.name}") result_groups.append({ 'base_name': base_name, 'image_paths': image_paths, 'count': len(image_paths), 'type': 'multi' if len(image_paths) > 1 else 'single' }) return result_groups except Exception as e: print(f"处理图片文件夹时出错: {e}") return [] @async_retry(timeout=30, max_retries=3) async def get_image_metadata(base_name, images_folder): """获取图文元数据,添加重试机制""" try: txt_file = Path(images_folder) / f"{base_name}.txt" if txt_file.exists(): async with aiofiles.open(txt_file, 'r', encoding='utf-8') as f: lines = await f.readlines() title = lines[0].strip() if len(lines) >= 1 else base_name # 处理标签 tags = [] if len(lines) >= 2: tags_line = lines[1].strip() if tags_line.startswith('#'): for tag in tags_line.split(): tag = tag.strip() if tag and tag.startswith('#'): tag_content = tag[1:].strip() if tag_content: tags.append(tag_content) else: tags = [tag.strip() for tag in tags_line.replace(',', ',').split(',') if tag.strip()] else: tags = ['生活', '分享'] # 处理地点 location = None if len(lines) >= 3: location_line = lines[2].strip() if location_line: location = location_line # 处理正文 content = None if len(lines) >= 4: content_lines = [line.rstrip() for line in lines[3:]] while content_lines and not content_lines[0]: content_lines.pop(0) while content_lines and not content_lines[-1]: content_lines.pop() if content_lines: content = '\n'.join(content_lines) else: title = base_name tags = ['生活', '分享'] location = None content = None return title, tags, location, content except Exception as e: print(f"读取元数据失败: {e}") return base_name, ['生活', '分享'], None, None async def main(): print("🎯 小红书图文上传工具 - 智能适配单图/多图") print("=" * 50) try: # 配置 images_folder = Path(BASE_DIR) / "images" account_file = Path(BASE_DIR / "cookies" / "xiaohongshu_uploader" / "account.json") # 检查文件夹和账号文件 if not images_folder.exists(): print(f"❌ 图片文件夹不存在: {images_folder}") print("请创建images文件夹并放入要上传的图片文件") return if not account_file.exists(): print("❌ 账号文件不存在,请先运行 get_xiaohongshu_cookie.py 获取登录凭证") return # 智能获取图片分组 print("🔍 正在扫描图片文件...") image_groups = get_image_groups_from_folder(images_folder) if not image_groups: print("❌ 未找到任何图片文件") print("支持的格式: jpg, jpeg, png, webp, bmp") return # 统计信息 total_groups = len(image_groups) single_count = sum(1 for group in image_groups if group['type'] == 'single') multi_count = sum(1 for group in image_groups if group['type'] == 'multi') total_images = sum(group['count'] for group in image_groups) print(f"\n📊 扫描结果:") print(f" • 总图文数: {total_groups} 个") print(f" • 单图图文: {single_count} 个") print(f" • 多图图文: {multi_count} 个") print(f" • 总图片数: {total_images} 张") # 生成定时发布时间 print(f"\n⏰ 生成发布时间表...") publish_datetimes = generate_schedule_time_next_day(total_groups, 1, daily_times=[16]) # 检查cookie print("🔐 验证登录状态...") cookie_setup = await xiaohongshu_setup(account_file, handle=False) if not cookie_setup: print("❌ Cookie验证失败,请先运行 get_xiaohongshu_cookie.py 获取登录凭证") return print("✅ 登录状态验证成功") # 创建上传器实例(使用增强版输入模拟器) uploader = XiaohongshuImageUploader(use_enhanced=True) # 逐个上传图文组 print(f"\n🚀 开始上传图文...") print("=" * 50) for index, group in enumerate(image_groups): try: base_name = group['base_name'] image_paths = group['image_paths'] image_count = group['count'] group_type = group['type'] # 获取图文信息 title, tags, location, content = await get_image_metadata(base_name, images_folder) print(f"\n📝 第 {index + 1}/{total_groups} 个图文") print(f" 类型: {'🖼️ 单图' if group_type == 'single' else '🖼️ ×' + str(image_count) + ' 多图'}") print(f" 名称: {base_name}") print(f" 标题: {title}") print(f" 标签: {', '.join(tags)}") print(f" 地点: {location if location else '未设置'}") print(f" 正文: {len(content) if content else 0} 字符") print(f" 发布: {publish_datetimes[index]}") # 创建图文上传实例(启用增强版输入) app = XiaoHongShuImage( title=title, image_paths=[str(path) for path in image_paths], tags=tags, publish_date=publish_datetimes[index], account_file=account_file, location=location, content=content, headless=False, use_enhanced_typing=True # 启用增强版输入 ) # 执行上传 print(f" 🔄 正在上传...") await app.main() type_desc = f"单图" if group_type == 'single' else f"{image_count}张图" print(f" ✅ 图文《{title}》({type_desc}) 上传完成") # 添加随机延迟,避免频繁上传 await asyncio.sleep(random.uniform(3, 5)) except Exception as e: print(f" ❌ 上传图文组 {base_name} 时出错: {e}") # 出错后等待较长时间再继续 await asyncio.sleep(random.uniform(5, 10)) continue print(f"\n🎉 所有图文上传完成!") print(f"📊 处理结果: {total_groups} 个图文组,{total_images} 张图片") print("=" * 50) except Exception as e: print(f"程序执行出错: {e}") if __name__ == '__main__': asyncio.run(main())