diff --git a/examples/upload_images_to_xiaohongshu.py b/examples/upload_images_to_xiaohongshu.py index 3e95c04..acd0c9c 100644 --- a/examples/upload_images_to_xiaohongshu.py +++ b/examples/upload_images_to_xiaohongshu.py @@ -14,250 +14,326 @@ sys.path.append(project_root) import asyncio import re from datetime import datetime, timedelta +import random +import aiofiles from conf import BASE_DIR from uploader.xiaohongshu_uploader.main import XiaoHongShuImage, xiaohongshu_setup from utils.files_times import generate_schedule_time_next_day +from utils.human_typing_wrapper import HumanTypingWrapper +from utils.enhanced_human_typing import EnhancedHumanTypingSimulator +from utils.network import async_retry +# 增强版人类输入配置 +ENHANCED_CONFIG = { + # 人类状态模拟 + 'energy_level': random.uniform(0.7, 1.0), + 'typing_proficiency': random.uniform(0.6, 0.9), + 'emotion_state': random.uniform(0.8, 1.0), + + # 错误处理 + 'base_error_rate': random.uniform(0.02, 0.05), + 'error_correction_speed': random.uniform(0.3, 0.8), + + # 速度控制 + 'speed_variance': random.uniform(0.1, 0.2), + 'burst_speed_probability': 0.1 +} + +# 保留原有配置作为备用 +HUMAN_CONFIG = { + 'min_typing_speed': 5, + 'max_typing_speed': 15, + 'pause_probability': 0.1, + 'chunk_input': True, + 'max_chunk_length': 100, + 'fatigue_effect': False +} + +class XiaohongshuImageUploader: + def __init__(self, page=None, use_enhanced=True): + if page: + if use_enhanced: + # 使用增强版输入模拟器 + self.human_typer = EnhancedHumanTypingSimulator(page) + self.use_enhanced = True + else: + # 使用原版输入模拟器作为备用 + self.human_typer = HumanTypingWrapper(page, HUMAN_CONFIG) + self.use_enhanced = False + + @async_retry(timeout=60, max_retries=3) + async def fill_form(self, selector, text, clear_first=True): + """增强版人性化填写表单""" + try: + if self.use_enhanced: + # 使用增强版输入方法 + success = await self.human_typer.type_text(text, selector) + if success: + await asyncio.sleep(random.uniform(0.5, 1.0)) + return True + return False + else: + # 使用原版输入方法作为备用 + await self.human_typer.type_text_human(selector, text, clear_first) + await asyncio.sleep(random.uniform(0.5, 1.0)) + return True + except Exception as e: + print(f"填写表单失败: {e}") + return False + + @async_retry(timeout=30, max_retries=3) + async def process_tags(self, tags_line): + """处理标签,使用增强版输入""" + tags = [] + if tags_line.startswith('#'): + for tag in tags_line.split(): + tag = tag.strip() + if tag and tag.startswith('#'): + tag_content = tag[1:].strip() + if tag_content: + tags.append(tag_content) + # 更自然的处理间隔 + await asyncio.sleep(random.uniform(0.2, 0.5)) + return tags def get_image_groups_from_folder(images_folder): - """ - 从文件夹中智能获取图片组 - 支持两种方式: - 1. 单独的图片文件(每个图片一个图文) - 2. 以数字结尾的图片组(如:旅行1.jpg, 旅行2.jpg, 旅行3.jpg -> 一个图文包含3张图) - """ - images_folder = Path(images_folder) - if not images_folder.exists(): - print(f"图片文件夹不存在: {images_folder}") - return [] - - # 获取所有图片文件 - image_extensions = ['.jpg', '.jpeg', '.png', '.webp', '.bmp'] - all_images_set = set() # 使用集合去重 - - for ext in image_extensions: - # 搜索小写扩展名 - for img in images_folder.glob(f"*{ext}"): - all_images_set.add(img.resolve()) # 使用绝对路径去重 - # 搜索大写扩展名 - for img in images_folder.glob(f"*{ext.upper()}"): - all_images_set.add(img.resolve()) # 使用绝对路径去重 - - all_images = list(all_images_set) # 转换回列表 - - if not all_images: - print(f"在 {images_folder} 中未找到图片文件") - return [] - - # 按文件名分组 - image_groups = {} - - for image_path in all_images: - filename = image_path.stem # 不包含扩展名的文件名 + """从文件夹中智能获取图片组""" + try: + images_folder = Path(images_folder) + if not images_folder.exists(): + print(f"图片文件夹不存在: {images_folder}") + return [] - # 检查文件名是否以数字结尾(如:旅行1, 美食2) - match = re.match(r'^(.+?)(\d+)$', filename) + # 获取所有图片文件 + image_extensions = ['.jpg', '.jpeg', '.png', '.webp', '.bmp'] + all_images_set = set() - if match: - # 有数字后缀,按基础名称分组 - base_name = match.group(1) - number = int(match.group(2)) + for ext in image_extensions: + for img in images_folder.glob(f"*{ext.lower()}"): + all_images_set.add(img.resolve()) + for img in images_folder.glob(f"*{ext.upper()}"): + all_images_set.add(img.resolve()) + + all_images = list(all_images_set) + + if not all_images: + print(f"在 {images_folder} 中未找到图片文件") + return [] + + # 按文件名分组 + image_groups = {} + + for image_path in all_images: + filename = image_path.stem + match = re.match(r'^(.+?)(\d+)$', filename) - if base_name not in image_groups: - image_groups[base_name] = [] - image_groups[base_name].append((number, image_path)) - else: - # 没有数字后缀,单独成组 - if filename not in image_groups: - image_groups[filename] = [] - image_groups[filename].append((1, image_path)) - - # 整理分组结果 - result_groups = [] - for base_name, images in image_groups.items(): - # 按数字排序 - images.sort(key=lambda x: x[0]) - image_paths = [img[1] for img in images] - - # 判断是单图还是多图 - if len(image_paths) == 1: - print(f"发现单图: {base_name} - {image_paths[0].name}") - else: - print(f"发现多图组: {base_name} - {len(image_paths)} 张图片") - for i, path in enumerate(image_paths, 1): - print(f" {i}. {path.name}") - - result_groups.append({ - 'base_name': base_name, - 'image_paths': image_paths, - 'count': len(image_paths), - 'type': 'multi' if len(image_paths) > 1 else 'single' - }) - - return result_groups - - -def get_image_metadata(base_name, images_folder): - """ - 根据基础名称获取图文元数据 - 查找对应的txt文件(如:旅行.txt 对应 旅行1.jpg, 旅行2.jpg 或单独的 旅行.jpg) - """ - txt_file = Path(images_folder) / f"{base_name}.txt" - - if txt_file.exists(): - with open(txt_file, 'r', encoding='utf-8') as f: - lines = f.readlines() - - # 第一行:标题 - title = lines[0].strip() if len(lines) >= 1 else base_name - - # 第二行:标签 - if len(lines) >= 2: - tags_line = lines[1].strip() - # 智能识别标签格式 - if tags_line.startswith('#'): - # 空格分隔格式:#美食 #甜品 #生活 - tags = [] - for tag in tags_line.split(): - tag = tag.strip() - if tag and tag.startswith('#'): - tag_content = tag[1:].strip() # 移除#号 - if tag_content: - tags.append(tag_content) + if match: + base_name = match.group(1) + number = int(match.group(2)) + if base_name not in image_groups: + image_groups[base_name] = [] + image_groups[base_name].append((number, image_path)) else: - # 逗号分隔格式:美食,甜品,生活 或 美食,甜品,生活 - tags = [tag.strip() for tag in tags_line.replace(',', ',').split(',') if tag.strip()] + if filename not in image_groups: + image_groups[filename] = [] + image_groups[filename].append((1, image_path)) + + # 整理分组结果 + result_groups = [] + for base_name, images in image_groups.items(): + images.sort(key=lambda x: x[0]) + image_paths = [img[1] for img in images] + + if len(image_paths) == 1: + print(f"发现单图: {base_name} - {image_paths[0].name}") + else: + print(f"发现多图组: {base_name} - {len(image_paths)} 张图片") + for i, path in enumerate(image_paths, 1): + print(f" {i}. {path.name}") + + result_groups.append({ + 'base_name': base_name, + 'image_paths': image_paths, + 'count': len(image_paths), + 'type': 'multi' if len(image_paths) > 1 else 'single' + }) + + return result_groups + except Exception as e: + print(f"处理图片文件夹时出错: {e}") + return [] + +@async_retry(timeout=30, max_retries=3) +async def get_image_metadata(base_name, images_folder): + """获取图文元数据,添加重试机制""" + try: + txt_file = Path(images_folder) / f"{base_name}.txt" + + if txt_file.exists(): + async with aiofiles.open(txt_file, 'r', encoding='utf-8') as f: + lines = await f.readlines() + + title = lines[0].strip() if len(lines) >= 1 else base_name + + # 处理标签 + tags = [] + if len(lines) >= 2: + tags_line = lines[1].strip() + if tags_line.startswith('#'): + for tag in tags_line.split(): + tag = tag.strip() + if tag and tag.startswith('#'): + tag_content = tag[1:].strip() + if tag_content: + tags.append(tag_content) + else: + tags = [tag.strip() for tag in tags_line.replace(',', ',').split(',') if tag.strip()] + else: + tags = ['生活', '分享'] + + # 处理地点 + location = None + if len(lines) >= 3: + location_line = lines[2].strip() + if location_line: + location = location_line + + # 处理正文 + content = None + if len(lines) >= 4: + content_lines = [line.rstrip() for line in lines[3:]] + while content_lines and not content_lines[0]: + content_lines.pop(0) + while content_lines and not content_lines[-1]: + content_lines.pop() + + if content_lines: + content = '\n'.join(content_lines) else: + title = base_name tags = ['生活', '分享'] - - # 第三行:地点(可选) - location = None - if len(lines) >= 3: - location_line = lines[2].strip() - if location_line: # 只有非空才设置地点 - location = location_line - - # 第四行及以后:正文内容(可选) - content = None - if len(lines) >= 4: - # 从第四行开始的所有内容作为正文 - content_lines = [line.rstrip() for line in lines[3:]] - # 移除开头和结尾的空行 - while content_lines and not content_lines[0]: - content_lines.pop(0) - while content_lines and not content_lines[-1]: - content_lines.pop() - - if content_lines: - content = '\n'.join(content_lines) - else: - # 没有对应的txt文件,使用默认值 - title = base_name - tags = ['生活', '分享'] - location = None - content = None - - return title, tags, location, content + location = None + content = None + + return title, tags, location, content + except Exception as e: + print(f"读取元数据失败: {e}") + return base_name, ['生活', '分享'], None, None - -if __name__ == '__main__': +async def main(): print("🎯 小红书图文上传工具 - 智能适配单图/多图") print("=" * 50) - # 配置 - images_folder = Path(BASE_DIR) / "images" - account_file = Path(BASE_DIR / "cookies" / "xiaohongshu_uploader" / "account.json") - - # 检查文件夹和账号文件 - if not images_folder.exists(): - print(f"❌ 图片文件夹不存在: {images_folder}") - print("请创建images文件夹并放入要上传的图片文件") - exit(1) - - if not account_file.exists(): - print("❌ 账号文件不存在,请先运行 get_xiaohongshu_cookie.py 获取登录凭证") - exit(1) - - # 智能获取图片分组 - print("🔍 正在扫描图片文件...") - image_groups = get_image_groups_from_folder(images_folder) - - if not image_groups: - print("❌ 未找到任何图片文件") - print("支持的格式: jpg, jpeg, png, webp, bmp") - exit(1) - - # 统计信息 - total_groups = len(image_groups) - single_count = sum(1 for group in image_groups if group['type'] == 'single') - multi_count = sum(1 for group in image_groups if group['type'] == 'multi') - total_images = sum(group['count'] for group in image_groups) - - print(f"\n📊 扫描结果:") - print(f" • 总图文数: {total_groups} 个") - print(f" • 单图图文: {single_count} 个") - print(f" • 多图图文: {multi_count} 个") - print(f" • 总图片数: {total_images} 张") - - # 生成定时发布时间(每天下午4点发布1个图文) - print(f"\n⏰ 生成发布时间表...") - publish_datetimes = generate_schedule_time_next_day(total_groups, 1, daily_times=[16]) - - # 检查cookie - print("🔐 验证登录状态...") - cookie_setup = asyncio.run(xiaohongshu_setup(account_file, handle=False)) - if not cookie_setup: - print("❌ Cookie验证失败,请先运行 get_xiaohongshu_cookie.py 获取登录凭证") - exit(1) - print("✅ 登录状态验证成功") - - # 逐个上传图文组 - print(f"\n🚀 开始上传图文...") - print("=" * 50) - - for index, group in enumerate(image_groups): - try: - base_name = group['base_name'] - image_paths = group['image_paths'] - image_count = group['count'] - group_type = group['type'] - - # 获取图文信息 - title, tags, location, content = get_image_metadata(base_name, images_folder) - - print(f"\n📝 第 {index + 1}/{total_groups} 个图文") - print(f" 类型: {'🖼️ 单图' if group_type == 'single' else '🖼️ ×' + str(image_count) + ' 多图'}") - print(f" 名称: {base_name}") - print(f" 标题: {title}") - print(f" 标签: {', '.join(tags)}") - print(f" 地点: {location if location else '未设置'}") - print(f" 正文: {len(content) if content else 0} 字符") - print(f" 发布: {publish_datetimes[index]}") - - # 创建图文上传实例(自动适配单图/多图) - app = XiaoHongShuImage( - title=title, - image_paths=[str(path) for path in image_paths], # 自动适配单张或多张图片 - tags=tags, - publish_date=publish_datetimes[index], - account_file=account_file, - location=location, - content=content, - headless=False - ) - - # 执行上传 - print(f" 🔄 正在上传...") - asyncio.run(app.main(), debug=False) - - type_desc = f"单图" if group_type == 'single' else f"{image_count}张图" - print(f" ✅ 图文《{title}》({type_desc}) 上传完成") - - except Exception as e: - print(f" ❌ 上传图文组 {base_name} 时出错: {e}") - continue - - print(f"\n🎉 所有图文上传完成!") - print(f"📊 处理结果: {total_groups} 个图文组,{total_images} 张图片") - print("=" * 50) + try: + # 配置 + images_folder = Path(BASE_DIR) / "images" + account_file = Path(BASE_DIR / "cookies" / "xiaohongshu_uploader" / "account.json") + + # 检查文件夹和账号文件 + if not images_folder.exists(): + print(f"❌ 图片文件夹不存在: {images_folder}") + print("请创建images文件夹并放入要上传的图片文件") + return + + if not account_file.exists(): + print("❌ 账号文件不存在,请先运行 get_xiaohongshu_cookie.py 获取登录凭证") + return + + # 智能获取图片分组 + print("🔍 正在扫描图片文件...") + image_groups = get_image_groups_from_folder(images_folder) + + if not image_groups: + print("❌ 未找到任何图片文件") + print("支持的格式: jpg, jpeg, png, webp, bmp") + return + + # 统计信息 + total_groups = len(image_groups) + single_count = sum(1 for group in image_groups if group['type'] == 'single') + multi_count = sum(1 for group in image_groups if group['type'] == 'multi') + total_images = sum(group['count'] for group in image_groups) + + print(f"\n📊 扫描结果:") + print(f" • 总图文数: {total_groups} 个") + print(f" • 单图图文: {single_count} 个") + print(f" • 多图图文: {multi_count} 个") + print(f" • 总图片数: {total_images} 张") + + # 生成定时发布时间 + print(f"\n⏰ 生成发布时间表...") + publish_datetimes = generate_schedule_time_next_day(total_groups, 1, daily_times=[16]) + + # 检查cookie + print("🔐 验证登录状态...") + cookie_setup = await xiaohongshu_setup(account_file, handle=False) + if not cookie_setup: + print("❌ Cookie验证失败,请先运行 get_xiaohongshu_cookie.py 获取登录凭证") + return + print("✅ 登录状态验证成功") + + # 创建上传器实例(使用增强版输入模拟器) + uploader = XiaohongshuImageUploader(use_enhanced=True) + + # 逐个上传图文组 + print(f"\n🚀 开始上传图文...") + print("=" * 50) + + for index, group in enumerate(image_groups): + try: + base_name = group['base_name'] + image_paths = group['image_paths'] + image_count = group['count'] + group_type = group['type'] + + # 获取图文信息 + title, tags, location, content = await get_image_metadata(base_name, images_folder) + + print(f"\n📝 第 {index + 1}/{total_groups} 个图文") + print(f" 类型: {'🖼️ 单图' if group_type == 'single' else '🖼️ ×' + str(image_count) + ' 多图'}") + print(f" 名称: {base_name}") + print(f" 标题: {title}") + print(f" 标签: {', '.join(tags)}") + print(f" 地点: {location if location else '未设置'}") + print(f" 正文: {len(content) if content else 0} 字符") + print(f" 发布: {publish_datetimes[index]}") + + # 创建图文上传实例(启用增强版输入) + app = XiaoHongShuImage( + title=title, + image_paths=[str(path) for path in image_paths], + tags=tags, + publish_date=publish_datetimes[index], + account_file=account_file, + location=location, + content=content, + headless=False, + use_enhanced_typing=True # 启用增强版输入 + ) + + # 执行上传 + print(f" 🔄 正在上传...") + await app.main() + + type_desc = f"单图" if group_type == 'single' else f"{image_count}张图" + print(f" ✅ 图文《{title}》({type_desc}) 上传完成") + + # 添加随机延迟,避免频繁上传 + await asyncio.sleep(random.uniform(3, 5)) + + except Exception as e: + print(f" ❌ 上传图文组 {base_name} 时出错: {e}") + # 出错后等待较长时间再继续 + await asyncio.sleep(random.uniform(5, 10)) + continue + + print(f"\n🎉 所有图文上传完成!") + print(f"📊 处理结果: {total_groups} 个图文组,{total_images} 张图片") + print("=" * 50) + + except Exception as e: + print(f"程序执行出错: {e}") + +if __name__ == '__main__': + asyncio.run(main()) \ No newline at end of file diff --git a/utils/enhanced_human_typing.py b/utils/enhanced_human_typing.py new file mode 100644 index 0000000..df7059b --- /dev/null +++ b/utils/enhanced_human_typing.py @@ -0,0 +1,232 @@ +import time +import random +import re +import asyncio +from typing import Dict, List, Optional + +class EnhancedHumanTypingSimulator: + def __init__(self, page=None): + # 保留原方案的简单配置 + self.base_config = { + 'min_typing_speed': 5, + 'max_typing_speed': 15, + 'pause_probability': 0.1, + 'chunk_input': True, + 'max_chunk_length': 50 + } + + # 新增高级特性配置 + self.advanced_config = { + # 人类状态模拟 + 'energy_level': random.uniform(0.7, 1.0), + 'typing_proficiency': random.uniform(0.6, 0.9), + 'emotion_state': random.uniform(0.8, 1.0), + + # 错误处理 + 'base_error_rate': random.uniform(0.02, 0.05), + 'error_correction_speed': random.uniform(0.3, 0.8), + + # 速度控制 + 'speed_variance': random.uniform(0.1, 0.2), + 'burst_speed_probability': 0.1 + } + + self.page = page + self.typing_session = { + 'start_time': None, + 'chars_typed': 0, + 'last_break_time': time.time() + } + + async def type_text(self, text: str, selector: str = None) -> bool: + """增强版的文本输入方法""" + try: + if selector: + # 等待并点击元素 + await self._prepare_input(selector) + + # 初始化会话 + self.typing_session['start_time'] = time.time() + + # 智能分段 + chunks = self._smart_split_text(text) + + for chunk in chunks: + # 获取当前状态 + current_state = self._get_current_state() + + # 输入当前段落 + await self._type_chunk(chunk, current_state) + + # 段落间自然停顿 + await self._natural_pause(current_state) + + return True + + except Exception as e: + print(f"输入文本时出错: {e}") + return False + + def _smart_split_text(self, text: str) -> List[str]: + """智能文本分段""" + paragraphs = text.split('\n') + chunks = [] + + for para in paragraphs: + if len(para) <= self.base_config['max_chunk_length']: + if para.strip(): + chunks.append(para) + continue + + sentences = re.split(r'([。!?,:;])', para) + current_chunk = '' + + for sent in sentences: + if len(current_chunk) + len(sent) < self.base_config['max_chunk_length']: + current_chunk += sent + else: + if current_chunk.strip(): + chunks.append(current_chunk) + current_chunk = sent + + if current_chunk.strip(): + chunks.append(current_chunk) + + return chunks + + def _get_current_state(self) -> Dict: + """获取当前输入状态""" + typing_duration = time.time() - self.typing_session['start_time'] + fatigue = min(typing_duration / 300, 0.7) + + self.advanced_config['energy_level'] *= (1 - fatigue * 0.1) + self.advanced_config['emotion_state'] *= random.uniform(0.98, 1.02) + + return { + 'energy_level': max(0.3, self.advanced_config['energy_level']), + 'emotion_state': max(0.4, min(1.0, self.advanced_config['emotion_state'])), + 'typing_proficiency': self.advanced_config['typing_proficiency'], + 'current_error_rate': self._calculate_error_rate(fatigue) + } + + async def _type_chunk(self, chunk: str, state: Dict): + """输入文本块""" + for char in chunk: + typing_speed = self._calculate_typing_speed(state) + + if random.random() < state['current_error_rate']: + await self._handle_typing_error(char, state) + else: + await self._type_char(char, typing_speed) + + self.typing_session['chars_typed'] += 1 + await self._micro_pause(state) + + def _calculate_typing_speed(self, state: Dict) -> float: + """计算实时打字速度""" + base_speed = random.uniform( + self.base_config['min_typing_speed'], + self.base_config['max_typing_speed'] + ) + + speed = base_speed * ( + 0.7 + state['energy_level'] * 0.3 + + state['emotion_state'] * 0.2 + + state['typing_proficiency'] * 0.3 + ) + + speed *= random.uniform( + 1 - self.advanced_config['speed_variance'], + 1 + self.advanced_config['speed_variance'] + ) + + return speed + + def _calculate_error_rate(self, fatigue: float) -> float: + """计算当前错误率""" + base_rate = self.advanced_config['base_error_rate'] + error_rate = base_rate * (1 + fatigue) + error_rate *= random.uniform(0.8, 1.2) + return min(error_rate, 0.15) + + async def _handle_typing_error(self, char: str, state: Dict): + """处理打字错误""" + error_types = ['typo', 'double_hit', 'delay'] + error_type = random.choice(error_types) + + if error_type == 'typo': + wrong_char = self._get_similar_char(char) + await self._type_char(wrong_char, self._calculate_typing_speed(state)) + await asyncio.sleep(random.uniform(0.2, 0.5)) + await self._press_key("Backspace") + await self._type_char(char, self._calculate_typing_speed(state)) + + elif error_type == 'double_hit': + await self._type_char(char, self._calculate_typing_speed(state)) + await self._type_char(char, self._calculate_typing_speed(state)) + await asyncio.sleep(random.uniform(0.1, 0.3)) + await self._press_key("Backspace") + + else: # delay + await asyncio.sleep(random.uniform(0.3, 0.8)) + await self._type_char(char, self._calculate_typing_speed(state)) + + async def _natural_pause(self, state: Dict): + """自然停顿""" + base_pause = random.uniform(0.5, 1.5) + + if state['energy_level'] < 0.5: + base_pause *= 1.3 + if state['emotion_state'] < 0.6: + base_pause *= 1.2 + + await asyncio.sleep(base_pause * random.uniform(0.8, 1.2)) + + async def _micro_pause(self, state: Dict): + """字符间的微小停顿""" + pause_time = random.uniform(0.05, 0.15) + if state['energy_level'] < 0.5: + pause_time *= 1.2 + await asyncio.sleep(pause_time) + + def _get_similar_char(self, char: str) -> str: + """获取相似字符""" + similar_chars = { + '的': '地得', + '了': '着啦', + '和': '与跟', + '我': '我我', + '是': '市师', + '在': '再在', + '有': '又有', + '都': '都读', + '好': '号毫' + } + return random.choice(similar_chars.get(char, char + char)) + + async def _prepare_input(self, selector: str): + """准备输入""" + try: + await self.page.wait_for_selector(selector, timeout=5000) + await self.page.click(selector) + await asyncio.sleep(random.uniform(0.3, 0.8)) + except Exception as e: + print(f"准备输入失败: {e}") + raise + + async def _type_char(self, char: str, speed: float): + """输入单个字符""" + try: + delay = 1000 / speed # 转换为毫秒 + await self.page.keyboard.type(char, delay=delay) + except Exception as e: + print(f"输入字符失败: {e}") + raise + + async def _press_key(self, key: str): + """按键操作""" + try: + await self.page.keyboard.press(key) + except Exception as e: + print(f"按键操作失败: {e}") + raise