Compare commits
3 Commits
de8fba8a77
...
43bae145b3
| Author | SHA1 | Date | |
|---|---|---|---|
| 43bae145b3 | |||
| 36e3940410 | |||
| 377e34c651 |
339
examples/upload_images_to_xiaohongshu.py
Normal file
339
examples/upload_images_to_xiaohongshu.py
Normal file
@ -0,0 +1,339 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# 小红书图文上传脚本 - 智能适配单图和多图
|
||||
import sys
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
# 获取当前脚本所在目录
|
||||
current_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
# 项目根目录是当前目录的上一级(因为examples目录和conf.py同级)
|
||||
project_root = os.path.dirname(current_dir)
|
||||
# 将项目根目录添加到系统路径
|
||||
sys.path.append(project_root)
|
||||
|
||||
import asyncio
|
||||
import re
|
||||
from datetime import datetime, timedelta
|
||||
import random
|
||||
import aiofiles
|
||||
|
||||
from conf import BASE_DIR
|
||||
from uploader.xiaohongshu_uploader.main import XiaoHongShuImage, xiaohongshu_setup
|
||||
from utils.files_times import generate_schedule_time_next_day
|
||||
from utils.human_typing_wrapper import HumanTypingWrapper
|
||||
from utils.enhanced_human_typing import EnhancedHumanTypingSimulator
|
||||
from utils.network import async_retry
|
||||
|
||||
# 增强版人类输入配置
|
||||
ENHANCED_CONFIG = {
|
||||
# 人类状态模拟
|
||||
'energy_level': random.uniform(0.7, 1.0),
|
||||
'typing_proficiency': random.uniform(0.6, 0.9),
|
||||
'emotion_state': random.uniform(0.8, 1.0),
|
||||
|
||||
# 错误处理
|
||||
'base_error_rate': random.uniform(0.02, 0.05),
|
||||
'error_correction_speed': random.uniform(0.3, 0.8),
|
||||
|
||||
# 速度控制
|
||||
'speed_variance': random.uniform(0.1, 0.2),
|
||||
'burst_speed_probability': 0.1
|
||||
}
|
||||
|
||||
# 保留原有配置作为备用
|
||||
HUMAN_CONFIG = {
|
||||
'min_typing_speed': 5,
|
||||
'max_typing_speed': 15,
|
||||
'pause_probability': 0.1,
|
||||
'chunk_input': True,
|
||||
'max_chunk_length': 100,
|
||||
'fatigue_effect': False
|
||||
}
|
||||
|
||||
class XiaohongshuImageUploader:
|
||||
def __init__(self, page=None, use_enhanced=True):
|
||||
if page:
|
||||
if use_enhanced:
|
||||
# 使用增强版输入模拟器
|
||||
self.human_typer = EnhancedHumanTypingSimulator(page)
|
||||
self.use_enhanced = True
|
||||
else:
|
||||
# 使用原版输入模拟器作为备用
|
||||
self.human_typer = HumanTypingWrapper(page, HUMAN_CONFIG)
|
||||
self.use_enhanced = False
|
||||
|
||||
@async_retry(timeout=60, max_retries=3)
|
||||
async def fill_form(self, selector, text, clear_first=True):
|
||||
"""增强版人性化填写表单"""
|
||||
try:
|
||||
if self.use_enhanced:
|
||||
# 使用增强版输入方法
|
||||
success = await self.human_typer.type_text(text, selector)
|
||||
if success:
|
||||
await asyncio.sleep(random.uniform(0.5, 1.0))
|
||||
return True
|
||||
return False
|
||||
else:
|
||||
# 使用原版输入方法作为备用
|
||||
await self.human_typer.type_text_human(selector, text, clear_first)
|
||||
await asyncio.sleep(random.uniform(0.5, 1.0))
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"填写表单失败: {e}")
|
||||
return False
|
||||
|
||||
@async_retry(timeout=30, max_retries=3)
|
||||
async def process_tags(self, tags_line):
|
||||
"""处理标签,使用增强版输入"""
|
||||
tags = []
|
||||
if tags_line.startswith('#'):
|
||||
for tag in tags_line.split():
|
||||
tag = tag.strip()
|
||||
if tag and tag.startswith('#'):
|
||||
tag_content = tag[1:].strip()
|
||||
if tag_content:
|
||||
tags.append(tag_content)
|
||||
# 更自然的处理间隔
|
||||
await asyncio.sleep(random.uniform(0.2, 0.5))
|
||||
return tags
|
||||
|
||||
def get_image_groups_from_folder(images_folder):
|
||||
"""从文件夹中智能获取图片组"""
|
||||
try:
|
||||
images_folder = Path(images_folder)
|
||||
if not images_folder.exists():
|
||||
print(f"图片文件夹不存在: {images_folder}")
|
||||
return []
|
||||
|
||||
# 获取所有图片文件
|
||||
image_extensions = ['.jpg', '.jpeg', '.png', '.webp', '.bmp']
|
||||
all_images_set = set()
|
||||
|
||||
for ext in image_extensions:
|
||||
for img in images_folder.glob(f"*{ext.lower()}"):
|
||||
all_images_set.add(img.resolve())
|
||||
for img in images_folder.glob(f"*{ext.upper()}"):
|
||||
all_images_set.add(img.resolve())
|
||||
|
||||
all_images = list(all_images_set)
|
||||
|
||||
if not all_images:
|
||||
print(f"在 {images_folder} 中未找到图片文件")
|
||||
return []
|
||||
|
||||
# 按文件名分组
|
||||
image_groups = {}
|
||||
|
||||
for image_path in all_images:
|
||||
filename = image_path.stem
|
||||
match = re.match(r'^(.+?)(\d+)$', filename)
|
||||
|
||||
if match:
|
||||
base_name = match.group(1)
|
||||
number = int(match.group(2))
|
||||
if base_name not in image_groups:
|
||||
image_groups[base_name] = []
|
||||
image_groups[base_name].append((number, image_path))
|
||||
else:
|
||||
if filename not in image_groups:
|
||||
image_groups[filename] = []
|
||||
image_groups[filename].append((1, image_path))
|
||||
|
||||
# 整理分组结果
|
||||
result_groups = []
|
||||
for base_name, images in image_groups.items():
|
||||
images.sort(key=lambda x: x[0])
|
||||
image_paths = [img[1] for img in images]
|
||||
|
||||
if len(image_paths) == 1:
|
||||
print(f"发现单图: {base_name} - {image_paths[0].name}")
|
||||
else:
|
||||
print(f"发现多图组: {base_name} - {len(image_paths)} 张图片")
|
||||
for i, path in enumerate(image_paths, 1):
|
||||
print(f" {i}. {path.name}")
|
||||
|
||||
result_groups.append({
|
||||
'base_name': base_name,
|
||||
'image_paths': image_paths,
|
||||
'count': len(image_paths),
|
||||
'type': 'multi' if len(image_paths) > 1 else 'single'
|
||||
})
|
||||
|
||||
return result_groups
|
||||
except Exception as e:
|
||||
print(f"处理图片文件夹时出错: {e}")
|
||||
return []
|
||||
|
||||
@async_retry(timeout=30, max_retries=3)
|
||||
async def get_image_metadata(base_name, images_folder):
|
||||
"""获取图文元数据,添加重试机制"""
|
||||
try:
|
||||
txt_file = Path(images_folder) / f"{base_name}.txt"
|
||||
|
||||
if txt_file.exists():
|
||||
async with aiofiles.open(txt_file, 'r', encoding='utf-8') as f:
|
||||
lines = await f.readlines()
|
||||
|
||||
title = lines[0].strip() if len(lines) >= 1 else base_name
|
||||
|
||||
# 处理标签
|
||||
tags = []
|
||||
if len(lines) >= 2:
|
||||
tags_line = lines[1].strip()
|
||||
if tags_line.startswith('#'):
|
||||
for tag in tags_line.split():
|
||||
tag = tag.strip()
|
||||
if tag and tag.startswith('#'):
|
||||
tag_content = tag[1:].strip()
|
||||
if tag_content:
|
||||
tags.append(tag_content)
|
||||
else:
|
||||
tags = [tag.strip() for tag in tags_line.replace(',', ',').split(',') if tag.strip()]
|
||||
else:
|
||||
tags = ['生活', '分享']
|
||||
|
||||
# 处理地点
|
||||
location = None
|
||||
if len(lines) >= 3:
|
||||
location_line = lines[2].strip()
|
||||
if location_line:
|
||||
location = location_line
|
||||
|
||||
# 处理正文
|
||||
content = None
|
||||
if len(lines) >= 4:
|
||||
content_lines = [line.rstrip() for line in lines[3:]]
|
||||
while content_lines and not content_lines[0]:
|
||||
content_lines.pop(0)
|
||||
while content_lines and not content_lines[-1]:
|
||||
content_lines.pop()
|
||||
|
||||
if content_lines:
|
||||
content = '\n'.join(content_lines)
|
||||
else:
|
||||
title = base_name
|
||||
tags = ['生活', '分享']
|
||||
location = None
|
||||
content = None
|
||||
|
||||
return title, tags, location, content
|
||||
except Exception as e:
|
||||
print(f"读取元数据失败: {e}")
|
||||
return base_name, ['生活', '分享'], None, None
|
||||
|
||||
async def main():
|
||||
print("🎯 小红书图文上传工具 - 智能适配单图/多图")
|
||||
print("=" * 50)
|
||||
|
||||
try:
|
||||
# 配置
|
||||
images_folder = Path(BASE_DIR) / "images"
|
||||
account_file = Path(BASE_DIR / "cookies" / "xiaohongshu_uploader" / "account.json")
|
||||
|
||||
# 检查文件夹和账号文件
|
||||
if not images_folder.exists():
|
||||
print(f"❌ 图片文件夹不存在: {images_folder}")
|
||||
print("请创建images文件夹并放入要上传的图片文件")
|
||||
return
|
||||
|
||||
if not account_file.exists():
|
||||
print("❌ 账号文件不存在,请先运行 get_xiaohongshu_cookie.py 获取登录凭证")
|
||||
return
|
||||
|
||||
# 智能获取图片分组
|
||||
print("🔍 正在扫描图片文件...")
|
||||
image_groups = get_image_groups_from_folder(images_folder)
|
||||
|
||||
if not image_groups:
|
||||
print("❌ 未找到任何图片文件")
|
||||
print("支持的格式: jpg, jpeg, png, webp, bmp")
|
||||
return
|
||||
|
||||
# 统计信息
|
||||
total_groups = len(image_groups)
|
||||
single_count = sum(1 for group in image_groups if group['type'] == 'single')
|
||||
multi_count = sum(1 for group in image_groups if group['type'] == 'multi')
|
||||
total_images = sum(group['count'] for group in image_groups)
|
||||
|
||||
print(f"\n📊 扫描结果:")
|
||||
print(f" • 总图文数: {total_groups} 个")
|
||||
print(f" • 单图图文: {single_count} 个")
|
||||
print(f" • 多图图文: {multi_count} 个")
|
||||
print(f" • 总图片数: {total_images} 张")
|
||||
|
||||
# 生成定时发布时间
|
||||
print(f"\n⏰ 生成发布时间表...")
|
||||
publish_datetimes = generate_schedule_time_next_day(total_groups, 1, daily_times=[16])
|
||||
|
||||
# 检查cookie
|
||||
print("🔐 验证登录状态...")
|
||||
cookie_setup = await xiaohongshu_setup(account_file, handle=False)
|
||||
if not cookie_setup:
|
||||
print("❌ Cookie验证失败,请先运行 get_xiaohongshu_cookie.py 获取登录凭证")
|
||||
return
|
||||
print("✅ 登录状态验证成功")
|
||||
|
||||
# 创建上传器实例(使用增强版输入模拟器)
|
||||
uploader = XiaohongshuImageUploader(use_enhanced=True)
|
||||
|
||||
# 逐个上传图文组
|
||||
print(f"\n🚀 开始上传图文...")
|
||||
print("=" * 50)
|
||||
|
||||
for index, group in enumerate(image_groups):
|
||||
try:
|
||||
base_name = group['base_name']
|
||||
image_paths = group['image_paths']
|
||||
image_count = group['count']
|
||||
group_type = group['type']
|
||||
|
||||
# 获取图文信息
|
||||
title, tags, location, content = await get_image_metadata(base_name, images_folder)
|
||||
|
||||
print(f"\n📝 第 {index + 1}/{total_groups} 个图文")
|
||||
print(f" 类型: {'🖼️ 单图' if group_type == 'single' else '🖼️ ×' + str(image_count) + ' 多图'}")
|
||||
print(f" 名称: {base_name}")
|
||||
print(f" 标题: {title}")
|
||||
print(f" 标签: {', '.join(tags)}")
|
||||
print(f" 地点: {location if location else '未设置'}")
|
||||
print(f" 正文: {len(content) if content else 0} 字符")
|
||||
print(f" 发布: {publish_datetimes[index]}")
|
||||
|
||||
# 创建图文上传实例(启用增强版输入)
|
||||
app = XiaoHongShuImage(
|
||||
title=title,
|
||||
image_paths=[str(path) for path in image_paths],
|
||||
tags=tags,
|
||||
publish_date=publish_datetimes[index],
|
||||
account_file=account_file,
|
||||
location=location,
|
||||
content=content,
|
||||
headless=False,
|
||||
use_enhanced_typing=True # 启用增强版输入
|
||||
)
|
||||
|
||||
# 执行上传
|
||||
print(f" 🔄 正在上传...")
|
||||
await app.main()
|
||||
|
||||
type_desc = f"单图" if group_type == 'single' else f"{image_count}张图"
|
||||
print(f" ✅ 图文《{title}》({type_desc}) 上传完成")
|
||||
|
||||
# 添加随机延迟,避免频繁上传
|
||||
await asyncio.sleep(random.uniform(3, 5))
|
||||
|
||||
except Exception as e:
|
||||
print(f" ❌ 上传图文组 {base_name} 时出错: {e}")
|
||||
# 出错后等待较长时间再继续
|
||||
await asyncio.sleep(random.uniform(5, 10))
|
||||
continue
|
||||
|
||||
print(f"\n🎉 所有图文上传完成!")
|
||||
print(f"📊 处理结果: {total_groups} 个图文组,{total_images} 张图片")
|
||||
print("=" * 50)
|
||||
|
||||
except Exception as e:
|
||||
print(f"程序执行出错: {e}")
|
||||
|
||||
if __name__ == '__main__':
|
||||
asyncio.run(main())
|
||||
@ -38,5 +38,5 @@ if __name__ == '__main__':
|
||||
# if thumbnail_path.exists():
|
||||
# app = XiaoHongShuVideo(title, file, tags, publish_datetimes[index], account_file, thumbnail_path=thumbnail_path)
|
||||
# else:
|
||||
app = XiaoHongShuVideo(title, file, tags, publish_datetimes[index], account_file, headless=True) # 推荐使用有头模式
|
||||
app = XiaoHongShuVideo(title, file, tags, publish_datetimes[index], account_file, headless=False) # 推荐使用有头模式
|
||||
asyncio.run(app.main(), debug=False)
|
||||
|
||||
231
images/README.md
Normal file
231
images/README.md
Normal file
@ -0,0 +1,231 @@
|
||||
# 小红书图文上传 - 图片文件夹
|
||||
|
||||
这个文件夹用于存放要上传到小红书的图片文件。
|
||||
|
||||
## 🎯 **支持的上传方式**
|
||||
- **单图上传**:每张图片单独发布一个图文
|
||||
- **多图上传**:多张图片组合成一个图文(最多9张)
|
||||
|
||||
## 📁 文件结构
|
||||
|
||||
```
|
||||
images/
|
||||
├── README.md # 说明文件
|
||||
├── 图片1.jpg # 图片文件
|
||||
├── 图片1.txt # 对应的标题和标签文件(可选)
|
||||
├── 图片2.png # 另一张图片
|
||||
├── 图片2.txt # 对应的标题和标签文件
|
||||
└── ...
|
||||
```
|
||||
|
||||
## 🖼️ 支持的图片格式
|
||||
|
||||
- **JPG/JPEG** - 推荐格式
|
||||
- **PNG** - 支持透明背景
|
||||
- **WEBP** - 现代格式,文件更小
|
||||
|
||||
## 📝 标题和标签配置
|
||||
|
||||
为每张图片创建同名的 `.txt` 文件来配置标题和标签:
|
||||
|
||||
### 文件格式
|
||||
```
|
||||
第一行:图文标题
|
||||
第二行:标签(支持两种格式)
|
||||
第三行:地点信息(可选,留空则不设置地理位置)
|
||||
第四行及以后:正文内容(可选,支持多行长文本)
|
||||
```
|
||||
|
||||
### 📋 **标签格式支持**
|
||||
|
||||
#### **格式1:逗号分隔**
|
||||
```
|
||||
美食,甜品,蛋糕,下午茶,生活
|
||||
```
|
||||
|
||||
#### **格式2:空格分隔(带#号)**
|
||||
```
|
||||
#美食 #甜品 #蛋糕 #下午茶 #生活
|
||||
```
|
||||
|
||||
**注意**:系统会自动识别格式并处理,两种格式效果相同。
|
||||
|
||||
### 🎯 **智能标签建议选择**
|
||||
|
||||
系统支持智能标签建议选择功能:
|
||||
|
||||
#### **选择策略**
|
||||
1. **精确匹配优先**:如果找到与输入标签完全一致的建议,优先选择
|
||||
2. **包含匹配备选**:如果没有精确匹配,选择包含关键词的相关建议
|
||||
3. **自动生成新标签**:如果没有任何匹配的建议,系统会自动生成新标签
|
||||
|
||||
#### **处理流程**
|
||||
```
|
||||
输入标签 → 等待建议加载 → 查找最佳匹配 → 选择建议或生成新标签
|
||||
```
|
||||
|
||||
#### **示例**
|
||||
- 输入 `#广州旅游` → 找到 `#广州旅游 16.8亿人浏览` → 自动选择
|
||||
- 输入 `#美食分享` → 找到 `#美食分享日常` → 选择相关建议
|
||||
- 输入 `#我的原创标签` → 无匹配建议 → 生成新标签
|
||||
|
||||
### 示例文件:`美食分享.txt`
|
||||
```
|
||||
今日美食推荐 - 超好吃的蛋糕
|
||||
美食,甜品,蛋糕,下午茶,生活
|
||||
北京市
|
||||
|
||||
今天发现了一家超棒的蛋糕店!🍰
|
||||
|
||||
这家店的招牌是巧克力慕斯蛋糕,
|
||||
口感丰富,甜而不腻,
|
||||
搭配他们家的手冲咖啡简直完美!
|
||||
|
||||
店里的装修也很有格调,
|
||||
很适合和朋友一起来聊天放松。
|
||||
下次还想再来尝试其他口味。
|
||||
|
||||
推荐给喜欢甜品的朋友们!
|
||||
你们有什么好吃的蛋糕店推荐吗?
|
||||
```
|
||||
|
||||
### 不设置地理位置的示例:`生活分享.txt`
|
||||
```
|
||||
今天的心情特别好
|
||||
生活,分享,心情
|
||||
|
||||
今天阳光明媚,心情特别好!
|
||||
和朋友一起度过了愉快的一天。
|
||||
```
|
||||
**注意**:第三行留空,系统会自动跳过地理位置设置。
|
||||
|
||||
## 📸 **多图上传功能**
|
||||
|
||||
### 文件命名规则
|
||||
对于多图上传,使用以下命名规则:
|
||||
```
|
||||
旅行1.jpg # 第1张图
|
||||
旅行2.jpg # 第2张图
|
||||
旅行3.jpg # 第3张图
|
||||
旅行.txt # 对应的文本文件
|
||||
```
|
||||
|
||||
### 多图示例:`旅行.txt`
|
||||
```
|
||||
三亚海边度假之旅
|
||||
旅行,度假,海边,三亚,美景
|
||||
三亚市
|
||||
|
||||
这次三亚之旅真的太棒了!🏖️
|
||||
|
||||
第一天:抵达三亚,入住海景酒店
|
||||
第二天:天涯海角,椰梦长廊漫步
|
||||
第三天:亚龙湾海滩,享受阳光沙滩
|
||||
|
||||
每一刻都是美好的回忆!
|
||||
```
|
||||
|
||||
### 使用脚本
|
||||
使用 `upload_images_to_xiaohongshu.py` - **智能适配单图和多图**
|
||||
```bash
|
||||
python examples/upload_images_to_xiaohongshu.py
|
||||
```
|
||||
|
||||
### 🤖 **智能适配规则**
|
||||
脚本会自动识别文件命名规则:
|
||||
|
||||
#### **单图模式**
|
||||
```
|
||||
美食.jpg ← 单独发布一个图文
|
||||
美食.txt ← 对应的文本文件
|
||||
```
|
||||
|
||||
#### **多图模式**
|
||||
```
|
||||
旅行1.jpg ┐
|
||||
旅行2.jpg ├─ 自动组合成一个图文
|
||||
旅行3.jpg ┘
|
||||
旅行.txt ← 对应的文本文件
|
||||
```
|
||||
|
||||
#### **混合模式**
|
||||
```
|
||||
美食.jpg ← 单图图文
|
||||
旅行1.jpg ┐
|
||||
旅行2.jpg ├─ 多图图文
|
||||
旅行3.jpg ┘
|
||||
生活.jpg ← 单图图文
|
||||
```
|
||||
**所有图片会被智能分组并按计划发布**
|
||||
|
||||
### 配置说明
|
||||
|
||||
#### 📍 地点信息(第三行)
|
||||
- **格式要求**:城市名(如"北京市"、"上海市"、"广州市")
|
||||
- **可选设置**:可以留空或不写
|
||||
- **自动处理**:如果不设置地点,系统将跳过位置设置
|
||||
|
||||
#### 📝 正文内容(第四行及以后)
|
||||
- **支持长文本**:可以写多行内容,支持换行
|
||||
- **内容丰富**:可以包含表情符号、问句、描述等
|
||||
- **自动处理**:如果不写正文,系统将使用标题作为默认内容
|
||||
- **格式保持**:会保持原有的换行和段落格式
|
||||
- **标签位置**:标签会自动添加在正文内容的后面,用空格分隔
|
||||
|
||||
## 🚀 使用方法
|
||||
|
||||
1. **准备图片**:将要上传的图片放入此文件夹
|
||||
2. **配置信息**:为每张图片创建对应的 `.txt` 文件(可选)
|
||||
3. **运行脚本**:执行 `python examples/upload_image_to_xiaohongshu.py`
|
||||
|
||||
### ⏰ 定时发布说明
|
||||
|
||||
- **默认设置**:每天下午4点发布1个图文
|
||||
- **自动排期**:多个图文会按天数顺序排期
|
||||
- **发布逻辑**:与视频发布保持一致的定时机制
|
||||
|
||||
## 📋 注意事项
|
||||
|
||||
- 如果没有 `.txt` 文件,将使用图片文件名作为标题
|
||||
- 标签可以用逗号或中文逗号分隔
|
||||
- 建议图片尺寸为正方形或竖屏比例
|
||||
- 单个图文最多支持9张图片
|
||||
|
||||
## 🔧 高级配置
|
||||
|
||||
### 单张图片上传
|
||||
```python
|
||||
from uploader.xiaohongshu_uploader.main import XiaoHongShuImage
|
||||
|
||||
app = XiaoHongShuImage(
|
||||
title="图文标题",
|
||||
image_paths=["path/to/image.jpg"],
|
||||
tags=["标签1", "标签2"],
|
||||
publish_date=0, # 0表示立即发布
|
||||
account_file="cookies/xiaohongshu_uploader/account.json",
|
||||
location="北京市" # 地点信息(可选)
|
||||
)
|
||||
```
|
||||
|
||||
### 多张图片上传
|
||||
```python
|
||||
app = XiaoHongShuImage(
|
||||
title="多图合集",
|
||||
image_paths=[
|
||||
"path/to/image1.jpg",
|
||||
"path/to/image2.jpg",
|
||||
"path/to/image3.jpg"
|
||||
],
|
||||
tags=["合集", "分享"],
|
||||
publish_date=0,
|
||||
account_file="cookies/xiaohongshu_uploader/account.json",
|
||||
location="上海市" # 地点信息(可选)
|
||||
)
|
||||
```
|
||||
|
||||
### 地点信息配置
|
||||
- **参数名称**: `location`
|
||||
- **数据类型**: 字符串或None
|
||||
- **示例值**: "北京市"、"上海市"、"广州市"、"深圳市"
|
||||
- **默认值**: None(不设置地点)
|
||||
- **注意事项**: 地点名称需要是小红书支持的有效地点
|
||||
@ -20,7 +20,7 @@ def sign_local(uri, data=None, a1="", web_session=""):
|
||||
chromium = playwright.chromium
|
||||
|
||||
# 如果一直失败可尝试设置成 False 让其打开浏览器,适当添加 sleep 可查看浏览器状态
|
||||
browser = chromium.launch(headless=True)
|
||||
browser = chromium.launch(headless=False)
|
||||
|
||||
browser_context = browser.new_context()
|
||||
browser_context.add_init_script(path=stealth_js_path)
|
||||
|
||||
@ -1,5 +1,8 @@
|
||||
from pathlib import Path
|
||||
|
||||
from conf import BASE_DIR
|
||||
from .main import XiaoHongShuVideo, XiaoHongShuImage, xiaohongshu_setup
|
||||
|
||||
Path(BASE_DIR / "cookies" / "xiaohongshu_uploader").mkdir(exist_ok=True)
|
||||
Path(BASE_DIR / "cookies" / "xiaohongshu_uploader").mkdir(exist_ok=True)
|
||||
|
||||
__all__ = ['XiaoHongShuVideo', 'XiaoHongShuImage', 'xiaohongshu_setup']
|
||||
@ -435,6 +435,891 @@ class XiaoHongShuVideo(object):
|
||||
# await page.screenshot(path=f"location_error_{location}.png")
|
||||
return False
|
||||
|
||||
async def main(self):
|
||||
async with async_playwright() as playwright:
|
||||
await self.upload(playwright)
|
||||
|
||||
|
||||
class XiaoHongShuImage(object):
|
||||
def __init__(self, title, image_paths, tags, publish_date: datetime, account_file, location=None, content=None, headless=True):
|
||||
self.title = title # 图文标题
|
||||
self.image_paths = image_paths if isinstance(image_paths, list) else [image_paths] # 支持单张或多张图片
|
||||
self.tags = tags
|
||||
self.publish_date = publish_date
|
||||
self.account_file = account_file
|
||||
self.location = location # 地点信息,可以从文本文件导入
|
||||
self.content = content # 正文内容,可以从文本文件导入
|
||||
self.date_format = '%Y年%m月%d日 %H:%M'
|
||||
self.local_executable_path = LOCAL_CHROME_PATH
|
||||
self.headless = headless
|
||||
|
||||
async def set_schedule_time_xiaohongshu(self, page, publish_date):
|
||||
"""设置定时发布时间"""
|
||||
print(" [-] 正在设置定时发布时间...")
|
||||
print(f"publish_date: {publish_date}")
|
||||
|
||||
# 选择包含特定文本内容的 label 元素
|
||||
label_element = page.locator("label:has-text('定时发布')")
|
||||
# 在选中的 label 元素下点击 checkbox
|
||||
await label_element.click()
|
||||
await asyncio.sleep(1)
|
||||
publish_date_hour = publish_date.strftime("%Y-%m-%d %H:%M")
|
||||
print(f"publish_date_hour: {publish_date_hour}")
|
||||
|
||||
await asyncio.sleep(1)
|
||||
await page.locator('.el-input__inner[placeholder="选择日期和时间"]').click()
|
||||
await page.keyboard.press("Control+KeyA")
|
||||
await page.keyboard.type(str(publish_date_hour))
|
||||
await page.keyboard.press("Enter")
|
||||
|
||||
await asyncio.sleep(1)
|
||||
|
||||
async def upload_images(self, page):
|
||||
"""上传图片"""
|
||||
from pathlib import Path
|
||||
xiaohongshu_logger.info(f'[+] 正在上传图片,共{len(self.image_paths)}张')
|
||||
|
||||
# 等待页面加载
|
||||
await asyncio.sleep(3)
|
||||
|
||||
# 查找上传元素(简化选择器,移除详细日志)
|
||||
upload_selectors = [
|
||||
"input[class='upload-input'][type='file'][multiple]",
|
||||
"input[accept='.jpg,.jpeg,.png,.webp']",
|
||||
"input[type='file'][multiple]",
|
||||
"input[type='file']",
|
||||
]
|
||||
|
||||
upload_input = None
|
||||
for selector in upload_selectors:
|
||||
try:
|
||||
upload_input = await page.wait_for_selector(selector, timeout=3000)
|
||||
if upload_input:
|
||||
break
|
||||
except:
|
||||
continue
|
||||
|
||||
if not upload_input:
|
||||
# 尝试点击上传按钮
|
||||
try:
|
||||
upload_button = await page.wait_for_selector('button:has-text("上传图片")', timeout=3000)
|
||||
if not upload_button:
|
||||
upload_button = await page.wait_for_selector('div:has-text("上传图片")', timeout=3000)
|
||||
|
||||
if upload_button:
|
||||
await upload_button.click()
|
||||
await asyncio.sleep(2)
|
||||
upload_input = await page.wait_for_selector("input[type='file']", timeout=3000)
|
||||
|
||||
if not upload_input:
|
||||
raise Exception("未找到图片上传元素")
|
||||
except Exception as e:
|
||||
raise Exception(f"图片上传失败: {e}")
|
||||
|
||||
# 上传图片(显示文件名而不是完整路径)
|
||||
file_names = [Path(p).name for p in self.image_paths]
|
||||
xiaohongshu_logger.info(f" [-] 上传文件: {', '.join(file_names)}")
|
||||
await upload_input.set_input_files(self.image_paths)
|
||||
|
||||
# 等待上传完成
|
||||
await asyncio.sleep(2)
|
||||
await self.wait_for_images_upload_complete(page)
|
||||
|
||||
async def wait_for_images_upload_complete(self, page):
|
||||
"""等待图片上传完成"""
|
||||
max_wait_time = 60
|
||||
wait_count = 0
|
||||
|
||||
while wait_count < max_wait_time:
|
||||
try:
|
||||
# 简化检查逻辑,移除详细日志
|
||||
# 检查添加按钮
|
||||
add_selectors = [
|
||||
'div.entry:has-text("添加")',
|
||||
'div:has-text("添加")',
|
||||
'[class*="add"]:has-text("添加")'
|
||||
]
|
||||
|
||||
for selector in add_selectors:
|
||||
try:
|
||||
add_button = await page.query_selector(selector)
|
||||
if add_button:
|
||||
xiaohongshu_logger.success(" [-] 图片上传完成")
|
||||
return
|
||||
except:
|
||||
continue
|
||||
|
||||
# 检查图片预览
|
||||
try:
|
||||
images = await page.query_selector_all('img')
|
||||
valid_images = []
|
||||
for img in images:
|
||||
src = await img.get_attribute('src')
|
||||
if src and ('data:image' in src or 'blob:' in src or len(src) > 50):
|
||||
valid_images.append(img)
|
||||
|
||||
if len(valid_images) >= len(self.image_paths):
|
||||
xiaohongshu_logger.success(f" [-] 图片上传完成 ({len(valid_images)}张)")
|
||||
await asyncio.sleep(2)
|
||||
return
|
||||
except:
|
||||
pass
|
||||
|
||||
# 检查加载状态
|
||||
loading_elements = await page.query_selector_all('[class*="loading"], [class*="uploading"]')
|
||||
if not loading_elements:
|
||||
xiaohongshu_logger.success(" [-] 图片上传完成")
|
||||
return
|
||||
|
||||
# 减少日志频率:每15秒输出一次进度
|
||||
if wait_count % 15 == 0 and wait_count > 0:
|
||||
xiaohongshu_logger.info(f" [-] 等待图片上传... ({wait_count}/{max_wait_time}秒)")
|
||||
|
||||
await asyncio.sleep(3)
|
||||
wait_count += 3
|
||||
|
||||
except Exception as e:
|
||||
xiaohongshu_logger.debug(f" [-] 检查上传状态出错: {e}")
|
||||
await asyncio.sleep(3)
|
||||
wait_count += 3
|
||||
|
||||
xiaohongshu_logger.warning(" [-] 图片上传等待超时,继续流程")
|
||||
|
||||
async def locate_content_editor(self, page):
|
||||
"""定位正文编辑区域"""
|
||||
# 方法1:基于class的精确定位
|
||||
primary_selector = "div.editor-content"
|
||||
# 方法2:基于属性的备用定位
|
||||
backup_selector = "div[contenteditable='true'][role='textbox']"
|
||||
|
||||
xiaohongshu_logger.info(" [-] 查找正文输入区域...")
|
||||
|
||||
# 尝试主选择器
|
||||
try:
|
||||
element = await page.wait_for_selector(primary_selector, timeout=3000)
|
||||
xiaohongshu_logger.info(f" [-] 使用主选择器成功定位: {primary_selector}")
|
||||
return element, primary_selector
|
||||
except:
|
||||
xiaohongshu_logger.warning(" [-] 主选择器定位失败,尝试备用选择器...")
|
||||
|
||||
# 尝试备用选择器
|
||||
try:
|
||||
element = await page.wait_for_selector(backup_selector, timeout=3000)
|
||||
xiaohongshu_logger.info(f" [-] 使用备用选择器成功定位: {backup_selector}")
|
||||
return element, backup_selector
|
||||
except:
|
||||
xiaohongshu_logger.error(" [-] 所有选择器都无法定位正文区域")
|
||||
raise Exception("无法找到正文输入区域")
|
||||
|
||||
async def fill_content(self, page, human_typer):
|
||||
"""填充标题和内容"""
|
||||
xiaohongshu_logger.info(f' [-] 正在填充标题和话题...')
|
||||
|
||||
# 使用传入的人类化输入包装器(避免重复创建)
|
||||
|
||||
# 填充标题
|
||||
title_container = page.locator('div.plugin.title-container').locator('input.d-text')
|
||||
if await title_container.count():
|
||||
# 使用人类化输入填充标题
|
||||
success = await human_typer.type_text_human(
|
||||
'div.plugin.title-container input.d-text',
|
||||
self.title[:30],
|
||||
clear_first=True
|
||||
)
|
||||
|
||||
if not success:
|
||||
xiaohongshu_logger.warning("标题人类化输入失败,使用传统方式")
|
||||
await title_container.fill(self.title[:30])
|
||||
else:
|
||||
# 使用人类化输入的备用方案
|
||||
success = await human_typer.type_text_human(".notranslate", self.title, clear_first=True)
|
||||
if not success:
|
||||
xiaohongshu_logger.warning("标题人类化输入失败,使用传统方式")
|
||||
titlecontainer = page.locator(".notranslate")
|
||||
await titlecontainer.click()
|
||||
await page.keyboard.press("Backspace")
|
||||
await page.keyboard.press("Control+KeyA")
|
||||
await page.keyboard.press("Delete")
|
||||
await page.keyboard.type(self.title)
|
||||
await page.keyboard.press("Enter")
|
||||
|
||||
# 定位正文编辑区域
|
||||
content_element, css_selector = await self.locate_content_editor(page)
|
||||
|
||||
# 🔧 创建专门用于正文输入的人类化输入包装器
|
||||
from utils.human_typing_wrapper import HumanTypingWrapper
|
||||
|
||||
# 根据正文长度调整输入速度配置
|
||||
content_length = len(self.content) if self.content else len(self.title) + 2
|
||||
|
||||
# 为长文本使用更慢的输入速度,提高真实性
|
||||
if content_length > 100:
|
||||
# 长文本:更慢更谨慎
|
||||
content_config = {
|
||||
'min_delay': 80, # 最小延迟80ms
|
||||
'max_delay': 200, # 最大延迟200ms
|
||||
'pause_probability': 0.15, # 15%概率暂停思考
|
||||
'pause_min': 800, # 暂停最少800ms
|
||||
'pause_max': 2000, # 暂停最多2秒
|
||||
'correction_probability': 0.02, # 2%概率打错字
|
||||
'backspace_probability': 0.01, # 1%概率退格重输
|
||||
}
|
||||
xiaohongshu_logger.info(f" [-] 长文本模式 ({content_length}字符),使用慢速人类化输入")
|
||||
else:
|
||||
# 短文本:相对较快但仍然人类化
|
||||
content_config = {
|
||||
'min_delay': 60, # 最小延迟60ms
|
||||
'max_delay': 150, # 最大延迟150ms
|
||||
'pause_probability': 0.1, # 10%概率暂停
|
||||
'pause_min': 500, # 暂停最少500ms
|
||||
'pause_max': 1200, # 暂停最多1.2秒
|
||||
'correction_probability': 0.01, # 1%概率打错字
|
||||
'backspace_probability': 0.005, # 0.5%概率退格
|
||||
}
|
||||
xiaohongshu_logger.info(f" [-] 短文本模式 ({content_length}字符),使用标准人类化输入")
|
||||
|
||||
# 创建专门的正文输入器
|
||||
content_typer = HumanTypingWrapper(page, content_config)
|
||||
|
||||
# 准备正文内容
|
||||
if self.content:
|
||||
# 如果有自定义正文内容,使用自定义内容
|
||||
content_text = self.content
|
||||
xiaohongshu_logger.info(f" [-] 使用自定义正文内容,长度: {len(content_text)} 字符")
|
||||
else:
|
||||
# 如果没有自定义内容,使用标题作为开头
|
||||
content_text = f"{self.title}\n\n"
|
||||
xiaohongshu_logger.info(" [-] 使用默认正文内容(标题)")
|
||||
|
||||
# 🔧 使用优化的人类化输入正文
|
||||
xiaohongshu_logger.info(f" [-] 开始人类化输入正文内容...")
|
||||
|
||||
# 对于长文本,分段输入更加真实
|
||||
if content_length > 200:
|
||||
xiaohongshu_logger.info(" [-] 长文本分段输入模式")
|
||||
success = await self._input_long_content_in_segments(page, content_typer, css_selector, content_text)
|
||||
else:
|
||||
# 短文本直接输入
|
||||
success = await content_typer.type_text_human(
|
||||
css_selector,
|
||||
content_text,
|
||||
clear_first=True
|
||||
)
|
||||
|
||||
if not success:
|
||||
xiaohongshu_logger.warning(" [-] 正文人类化输入失败,使用传统方式")
|
||||
await content_element.click()
|
||||
await asyncio.sleep(0.5) # 点击后稍作等待
|
||||
|
||||
# 传统方式也要模拟人类输入速度
|
||||
xiaohongshu_logger.info(" [-] 使用传统方式进行人类化输入...")
|
||||
await self._fallback_human_typing(page, content_text)
|
||||
|
||||
xiaohongshu_logger.success(f" [-] 正文输入完成,共 {len(content_text)} 字符")
|
||||
|
||||
# 在正文后面添加标签
|
||||
xiaohongshu_logger.info(" [-] 开始在正文后面添加标签...")
|
||||
|
||||
# 确保光标在正文的最后位置
|
||||
await content_element.click()
|
||||
await asyncio.sleep(0.3)
|
||||
|
||||
# 移动光标到文本末尾
|
||||
await page.keyboard.press("End")
|
||||
await page.keyboard.press("Control+End") # 确保到达最末尾
|
||||
await asyncio.sleep(0.3)
|
||||
|
||||
# 添加两个换行,将标签与正文分开
|
||||
await page.keyboard.press("Enter")
|
||||
await page.keyboard.press("Enter")
|
||||
await asyncio.sleep(0.3)
|
||||
|
||||
# 标签输入(参考视频标签添加方式)
|
||||
xiaohongshu_logger.info(f" [-] 开始输入标签 ({len(self.tags)}个)...")
|
||||
|
||||
# 创建标签输入器
|
||||
from utils.human_typing_wrapper import HumanTypingWrapper
|
||||
tag_config = {
|
||||
'min_delay': 400, 'max_delay': 700, 'pause_probability': 0.25,
|
||||
'pause_min': 400, 'pause_max': 1000, 'correction_probability': 0.02,
|
||||
'backspace_probability': 0.01,
|
||||
}
|
||||
tag_typer = HumanTypingWrapper(page, tag_config)
|
||||
|
||||
# 输入标签(简化日志)
|
||||
success = True
|
||||
for i, tag in enumerate(self.tags):
|
||||
# 标签间思考时间
|
||||
if i > 0:
|
||||
import random
|
||||
await asyncio.sleep(random.uniform(0.8, 1.5))
|
||||
|
||||
# 输入标签
|
||||
tag_success = await tag_typer.type_text_human(
|
||||
css_selector, f"#{tag}", clear_first=False
|
||||
)
|
||||
|
||||
if not tag_success:
|
||||
success = False
|
||||
break
|
||||
|
||||
# 处理标签建议
|
||||
await self._handle_tag_suggestions_after_input(page, tag)
|
||||
|
||||
# 标签间分隔
|
||||
if i < len(self.tags) - 1:
|
||||
await page.keyboard.type(" ")
|
||||
import random
|
||||
await asyncio.sleep(random.uniform(0.2, 0.5))
|
||||
|
||||
# 备用输入方式
|
||||
if not success:
|
||||
xiaohongshu_logger.warning(" [-] 使用备用标签输入方式")
|
||||
await self._fallback_tag_input(page, css_selector)
|
||||
|
||||
xiaohongshu_logger.success(f' [-] 标签输入完成 ({len(self.tags)}个)')
|
||||
|
||||
async def _handle_tag_suggestions_after_input(self, page: Page, tag: str) -> None:
|
||||
"""标签输入后处理建议选择"""
|
||||
try:
|
||||
import random
|
||||
await asyncio.sleep(random.uniform(0.5, 1.0))
|
||||
|
||||
suggestion_found = await self._handle_tag_suggestions(page, tag)
|
||||
|
||||
if not suggestion_found:
|
||||
await asyncio.sleep(random.uniform(0.2, 0.5))
|
||||
await page.keyboard.press("Enter")
|
||||
|
||||
except Exception as e:
|
||||
xiaohongshu_logger.debug(f" [-] 标签建议处理出错: {e}")
|
||||
|
||||
async def _fallback_tag_input(self, page: Page, css_selector: str) -> None:
|
||||
"""备用标签输入方法"""
|
||||
try:
|
||||
import random
|
||||
await page.click(css_selector)
|
||||
await asyncio.sleep(0.5)
|
||||
|
||||
for index, tag in enumerate(self.tags, start=1):
|
||||
# 输入标签(移除详细日志)
|
||||
await page.keyboard.type("#")
|
||||
await asyncio.sleep(random.uniform(0.1, 0.3))
|
||||
|
||||
for char in tag:
|
||||
await page.keyboard.type(char, delay=random.randint(300, 600))
|
||||
await asyncio.sleep(random.uniform(0.8, 1.2))
|
||||
|
||||
# 标签间分隔
|
||||
if index < len(self.tags):
|
||||
await page.keyboard.type(" ")
|
||||
await asyncio.sleep(random.uniform(0.3, 0.6))
|
||||
|
||||
except Exception as e:
|
||||
xiaohongshu_logger.error(f" [-] 备用标签输入失败: {e}")
|
||||
|
||||
async def _input_long_content_in_segments(self, page: Page, content_typer, css_selector: str, content_text: str) -> bool:
|
||||
"""
|
||||
分段输入长文本,模拟真实的写作过程
|
||||
|
||||
Args:
|
||||
page: Playwright页面对象
|
||||
content_typer: 人类化输入包装器
|
||||
css_selector: 内容区域选择器
|
||||
content_text: 要输入的文本内容
|
||||
|
||||
Returns:
|
||||
bool: 是否输入成功
|
||||
"""
|
||||
try:
|
||||
import random
|
||||
|
||||
# 按段落分割文本(简化日志)
|
||||
paragraphs = content_text.split('\n\n')
|
||||
|
||||
# 清空输入区域
|
||||
await content_typer.clear_element(css_selector)
|
||||
await asyncio.sleep(0.5)
|
||||
|
||||
for i, paragraph in enumerate(paragraphs, 1):
|
||||
if not paragraph.strip():
|
||||
continue
|
||||
|
||||
# 输入段落内容(移除详细日志)
|
||||
success = await content_typer.type_text_human(
|
||||
css_selector, paragraph, clear_first=False
|
||||
)
|
||||
|
||||
if not success:
|
||||
xiaohongshu_logger.warning(f" [-] 第 {i} 段落输入失败")
|
||||
return False
|
||||
|
||||
# 段落间添加换行和思考时间
|
||||
if i < len(paragraphs):
|
||||
await page.keyboard.press("Enter")
|
||||
await page.keyboard.press("Enter")
|
||||
await asyncio.sleep(random.uniform(1.0, 3.0))
|
||||
|
||||
xiaohongshu_logger.success(" [-] 分段输入完成")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
xiaohongshu_logger.error(f" [-] 分段输入失败: {e}")
|
||||
return False
|
||||
|
||||
async def _fallback_human_typing(self, page: Page, content_text: str) -> None:
|
||||
"""
|
||||
备用的人类化输入方法
|
||||
|
||||
Args:
|
||||
page: Playwright页面对象
|
||||
content_text: 要输入的文本内容
|
||||
"""
|
||||
import random
|
||||
|
||||
char_count = 0
|
||||
for char in content_text:
|
||||
await page.keyboard.type(char)
|
||||
char_count += 1
|
||||
|
||||
# 随机延迟,模拟人类打字
|
||||
delay = random.randint(50, 120) # 50-120ms随机延迟
|
||||
await asyncio.sleep(delay / 1000)
|
||||
|
||||
# 偶尔暂停,模拟思考(移除详细日志)
|
||||
if random.random() < 0.05: # 5%概率暂停
|
||||
pause_time = random.randint(300, 800)
|
||||
await asyncio.sleep(pause_time / 1000)
|
||||
|
||||
# 减少进度日志频率:每100个字符显示一次
|
||||
if char_count % 100 == 0:
|
||||
xiaohongshu_logger.debug(f" [-] 输入进度: {char_count}/{len(content_text)}")
|
||||
|
||||
async def _input_single_tag(self, page: Page, tag: str, current: int, total: int) -> None:
|
||||
"""
|
||||
输入单个标签并智能处理建议选择(增强人类化行为)
|
||||
|
||||
Args:
|
||||
page: Playwright页面对象
|
||||
tag: 标签内容
|
||||
current: 当前标签序号
|
||||
total: 总标签数量
|
||||
"""
|
||||
import random
|
||||
|
||||
tag_text = f"#{tag}"
|
||||
xiaohongshu_logger.info(f" [-] 输入标签: {tag_text} ({current}/{total})")
|
||||
|
||||
try:
|
||||
# 🔧 1. 标签输入前的思考暂停(模拟用户思考下一个标签)
|
||||
if current > 1: # 第一个标签不需要思考时间
|
||||
think_time = random.uniform(0.5, 2.0) # 0.5-2秒思考时间
|
||||
xiaohongshu_logger.debug(f" [-] 思考下一个标签... ({think_time:.1f}秒)")
|
||||
await asyncio.sleep(think_time)
|
||||
|
||||
# 🔧 2. 更人类化的逐字符输入
|
||||
await self._human_like_tag_typing(page, tag_text)
|
||||
|
||||
# 🔧 3. 输入完成后的短暂停顿(模拟用户检查输入)
|
||||
check_pause = random.uniform(0.3, 0.8)
|
||||
await asyncio.sleep(check_pause)
|
||||
|
||||
# 🔧 4. 等待并处理标签建议(随机等待时间)
|
||||
suggestion_wait = random.uniform(0.6, 1.2) # 0.6-1.2秒随机等待
|
||||
xiaohongshu_logger.debug(f" [-] 等待标签建议... ({suggestion_wait:.1f}秒)")
|
||||
await asyncio.sleep(suggestion_wait)
|
||||
|
||||
# 5. 查找标签建议
|
||||
suggestion_found = await self._handle_tag_suggestions(page, tag)
|
||||
|
||||
if suggestion_found:
|
||||
xiaohongshu_logger.info(f" [-] 选择了匹配的标签建议: {tag}")
|
||||
else:
|
||||
# 🔧 没有匹配建议时的犹豫行为
|
||||
hesitate_time = random.uniform(0.2, 0.6) # 犹豫0.2-0.6秒
|
||||
xiaohongshu_logger.debug(f" [-] 未找到建议,犹豫中... ({hesitate_time:.1f}秒)")
|
||||
await asyncio.sleep(hesitate_time)
|
||||
|
||||
xiaohongshu_logger.info(f" [-] 未找到匹配建议,生成新标签: {tag}")
|
||||
await page.keyboard.press("Enter")
|
||||
await asyncio.sleep(random.uniform(0.2, 0.5)) # 随机确认时间
|
||||
|
||||
# 🔧 6. 标签间的自然间隔
|
||||
if current < total:
|
||||
# 模拟用户在标签间的自然停顿
|
||||
inter_tag_pause = random.uniform(0.3, 0.8)
|
||||
await asyncio.sleep(inter_tag_pause)
|
||||
|
||||
await page.keyboard.type(" ")
|
||||
|
||||
# 空格后的微小停顿
|
||||
space_pause = random.uniform(0.1, 0.3)
|
||||
await asyncio.sleep(space_pause)
|
||||
|
||||
xiaohongshu_logger.info(f" [-] 标签处理完成: {tag} ({current}/{total})")
|
||||
|
||||
except Exception as e:
|
||||
xiaohongshu_logger.error(f" [-] 输入标签 {tag} 时出错: {e}")
|
||||
# 出错时也要模拟人类的反应时间
|
||||
await asyncio.sleep(random.uniform(0.2, 0.5))
|
||||
await page.keyboard.press("Enter")
|
||||
await asyncio.sleep(random.uniform(0.3, 0.6))
|
||||
|
||||
async def _human_like_tag_typing(self, page: Page, tag_text: str) -> None:
|
||||
"""
|
||||
更人类化的标签输入方法
|
||||
|
||||
Args:
|
||||
page: Playwright页面对象
|
||||
tag_text: 要输入的标签文本
|
||||
"""
|
||||
import random
|
||||
|
||||
# 模拟不同的打字节奏
|
||||
for i, char in enumerate(tag_text):
|
||||
# 🔧 更宽泛的延迟范围,模拟真实打字速度变化
|
||||
if char == '#':
|
||||
# 输入#号时稍慢一些(用户需要按shift+3)
|
||||
delay = random.randint(120, 250)
|
||||
elif char.isalpha():
|
||||
# 字母输入相对较快
|
||||
delay = random.randint(60, 180)
|
||||
else:
|
||||
# 其他字符(数字、符号)稍慢
|
||||
delay = random.randint(80, 200)
|
||||
|
||||
await page.keyboard.type(char)
|
||||
await asyncio.sleep(delay / 1000)
|
||||
|
||||
# 🔧 模拟偶尔的打字错误和修正(2%概率)
|
||||
if random.random() < 0.02 and i < len(tag_text) - 1:
|
||||
# 打错一个字符
|
||||
wrong_char = random.choice('abcdefghijklmnopqrstuvwxyz')
|
||||
await page.keyboard.type(wrong_char)
|
||||
await asyncio.sleep(random.randint(100, 300) / 1000)
|
||||
|
||||
# 发现错误,退格删除
|
||||
await asyncio.sleep(random.uniform(0.2, 0.5)) # 发现错误的反应时间
|
||||
await page.keyboard.press("Backspace")
|
||||
await asyncio.sleep(random.randint(80, 150) / 1000)
|
||||
|
||||
xiaohongshu_logger.debug(f" [-] 模拟打字错误并修正")
|
||||
|
||||
# 🔧 模拟偶尔的思考停顿(5%概率)
|
||||
if random.random() < 0.05 and i < len(tag_text) - 1:
|
||||
pause_time = random.uniform(0.3, 0.8)
|
||||
xiaohongshu_logger.debug(f" [-] 输入中思考停顿 ({pause_time:.1f}秒)")
|
||||
await asyncio.sleep(pause_time)
|
||||
|
||||
async def _handle_tag_suggestions(self, page: Page, tag: str) -> bool:
|
||||
"""
|
||||
处理标签建议选择
|
||||
|
||||
Args:
|
||||
page: Playwright页面对象
|
||||
tag: 标签内容
|
||||
|
||||
Returns:
|
||||
bool: 是否找到并选择了匹配的建议
|
||||
"""
|
||||
try:
|
||||
xiaohongshu_logger.info(f" [-] 查找标签 '{tag}' 的建议...")
|
||||
|
||||
# 查找标签建议容器的多种可能选择器
|
||||
suggestion_selectors = [
|
||||
'div[class*="suggestion"]',
|
||||
'div[class*="dropdown"]',
|
||||
'div[class*="popover"]',
|
||||
'ul[class*="options"]',
|
||||
'div[class*="tag-suggestion"]',
|
||||
'[role="listbox"]',
|
||||
'[role="menu"]'
|
||||
]
|
||||
|
||||
suggestion_container = None
|
||||
for selector in suggestion_selectors:
|
||||
try:
|
||||
suggestion_container = await page.wait_for_selector(selector, timeout=1000)
|
||||
if suggestion_container:
|
||||
xiaohongshu_logger.debug(f" 找到建议容器: {selector}")
|
||||
break
|
||||
except:
|
||||
continue
|
||||
|
||||
if not suggestion_container:
|
||||
xiaohongshu_logger.debug(" [-] 未找到标签建议容器")
|
||||
return False
|
||||
|
||||
# 查找匹配的标签建议
|
||||
suggestion_items = await page.query_selector_all(
|
||||
f'{suggestion_selectors[0]} div, '
|
||||
f'{suggestion_selectors[0]} li, '
|
||||
f'{suggestion_selectors[0]} span'
|
||||
)
|
||||
|
||||
xiaohongshu_logger.debug(f" [-] 找到 {len(suggestion_items)} 个建议项")
|
||||
|
||||
# 寻找最佳匹配
|
||||
best_match = None
|
||||
exact_match = None
|
||||
|
||||
for item in suggestion_items:
|
||||
try:
|
||||
item_text = await item.inner_text()
|
||||
if not item_text:
|
||||
continue
|
||||
|
||||
# 清理文本(移除#号和额外空格)
|
||||
clean_text = item_text.strip()
|
||||
if clean_text.startswith('#'):
|
||||
clean_text = clean_text[1:].strip()
|
||||
|
||||
xiaohongshu_logger.debug(f" 建议项: {clean_text}")
|
||||
|
||||
# 精确匹配
|
||||
if clean_text == tag:
|
||||
exact_match = item
|
||||
xiaohongshu_logger.info(f" [-] 找到精确匹配: {clean_text}")
|
||||
break
|
||||
|
||||
# 包含匹配(作为备选)
|
||||
if tag in clean_text or clean_text in tag:
|
||||
if not best_match:
|
||||
best_match = item
|
||||
xiaohongshu_logger.debug(f" 备选匹配: {clean_text}")
|
||||
|
||||
except Exception as e:
|
||||
xiaohongshu_logger.debug(f" 处理建议项时出错: {e}")
|
||||
continue
|
||||
|
||||
# 选择最佳匹配
|
||||
selected_item = exact_match or best_match
|
||||
|
||||
if selected_item:
|
||||
try:
|
||||
# 🔧 模拟用户查看和选择建议的过程
|
||||
import random
|
||||
|
||||
# 查看建议的时间(用户需要读取和比较)
|
||||
review_time = random.uniform(0.4, 1.0)
|
||||
xiaohongshu_logger.debug(f" [-] 查看标签建议... ({review_time:.1f}秒)")
|
||||
await asyncio.sleep(review_time)
|
||||
|
||||
# 偶尔犹豫一下是否选择这个建议(10%概率)
|
||||
if random.random() < 0.1:
|
||||
hesitate_time = random.uniform(0.3, 0.7)
|
||||
xiaohongshu_logger.debug(f" [-] 犹豫是否选择建议... ({hesitate_time:.1f}秒)")
|
||||
await asyncio.sleep(hesitate_time)
|
||||
|
||||
await selected_item.click()
|
||||
|
||||
# 点击后的确认时间
|
||||
confirm_time = random.uniform(0.2, 0.5)
|
||||
await asyncio.sleep(confirm_time)
|
||||
|
||||
xiaohongshu_logger.info(f" [-] 成功选择标签建议")
|
||||
return True
|
||||
except Exception as e:
|
||||
xiaohongshu_logger.warning(f" [-] 点击标签建议失败: {e}")
|
||||
return False
|
||||
else:
|
||||
xiaohongshu_logger.debug(f" [-] 未找到匹配的标签建议")
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
xiaohongshu_logger.debug(f" [-] 处理标签建议时出错: {e}")
|
||||
return False
|
||||
|
||||
async def set_location(self, page: Page, location: str) -> bool:
|
||||
"""设置地理位置信息"""
|
||||
xiaohongshu_logger.info(f" [-] 开始设置地理位置: {location}")
|
||||
|
||||
try:
|
||||
# 1. 点击地点输入框
|
||||
xiaohongshu_logger.info(" [-] 点击地点输入框...")
|
||||
selectors = [
|
||||
'div.d-select--color-text-title--color-bg-fill',
|
||||
'div.d-text.d-select-placeholder.d-text-ellipsis.d-text-nowrap',
|
||||
'div[class*="d-select"]'
|
||||
]
|
||||
|
||||
clicked = False
|
||||
for selector in selectors:
|
||||
try:
|
||||
element = await page.wait_for_selector(selector, timeout=3000)
|
||||
await element.click()
|
||||
clicked = True
|
||||
break
|
||||
except:
|
||||
continue
|
||||
|
||||
if not clicked:
|
||||
xiaohongshu_logger.error(" [-] 未找到地点输入框")
|
||||
return False
|
||||
|
||||
# 2. 输入地点名称
|
||||
xiaohongshu_logger.info(f" [-] 输入地点名称: {location}")
|
||||
await page.keyboard.press("Control+a")
|
||||
await page.keyboard.type(location)
|
||||
await asyncio.sleep(2) # 等待下拉选项加载
|
||||
|
||||
# 3. 选择匹配的地点选项
|
||||
xiaohongshu_logger.info(" [-] 查找匹配的地点选项...")
|
||||
|
||||
# 尝试多种选择器找到包含地点名称的选项
|
||||
option_selectors = [
|
||||
f'//div[contains(@class, "name") and contains(text(), "{location}")]',
|
||||
f'//div[contains(text(), "{location}市")]',
|
||||
f'//div[contains(text(), "{location}")]'
|
||||
]
|
||||
|
||||
selected = False
|
||||
for selector in option_selectors:
|
||||
try:
|
||||
options = await page.query_selector_all(selector)
|
||||
if options:
|
||||
# 选择第一个匹配的选项
|
||||
option = options[0]
|
||||
option_text = await option.inner_text()
|
||||
await option.click()
|
||||
xiaohongshu_logger.success(f" [-] 成功选择地点: {option_text}")
|
||||
selected = True
|
||||
break
|
||||
except:
|
||||
continue
|
||||
|
||||
if not selected:
|
||||
xiaohongshu_logger.warning(f" [-] 未找到匹配的地点选项: {location}")
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
xiaohongshu_logger.error(f" [-] 设置地理位置失败: {e}")
|
||||
return False
|
||||
|
||||
async def upload(self, playwright: Playwright) -> None:
|
||||
"""主要的上传流程"""
|
||||
# 🔧 使用增强的反检测浏览器配置
|
||||
from utils.anti_detection import AntiDetectionConfig
|
||||
import random
|
||||
|
||||
# 反检测浏览器参数
|
||||
browser_args = AntiDetectionConfig.STANDARD_BROWSER_ARGS.copy()
|
||||
|
||||
# 使用 Chromium 浏览器启动一个浏览器实例
|
||||
if self.local_executable_path:
|
||||
browser = await playwright.chromium.launch(
|
||||
headless=self.headless,
|
||||
executable_path=self.local_executable_path,
|
||||
args=browser_args # 🔧 添加反检测参数
|
||||
)
|
||||
else:
|
||||
browser = await playwright.chromium.launch(
|
||||
headless=self.headless,
|
||||
args=browser_args # 🔧 添加反检测参数
|
||||
)
|
||||
|
||||
# 🔧 创建增强的浏览器上下文
|
||||
context_options = {
|
||||
"storage_state": f"{self.account_file}",
|
||||
"locale": "zh-CN",
|
||||
"timezone_id": "Asia/Shanghai"
|
||||
}
|
||||
|
||||
# 🔧 为无头模式添加完整的反检测设置
|
||||
if self.headless:
|
||||
context_options.update({
|
||||
'viewport': {'width': 1920, 'height': 1080}, # 🔧 使用文档建议的分辨率
|
||||
'device_scale_factor': 1,
|
||||
'has_touch': False,
|
||||
'is_mobile': False
|
||||
})
|
||||
|
||||
# 使用随机用户代理
|
||||
user_agent = random.choice(AntiDetectionConfig.REAL_USER_AGENTS)
|
||||
context_options["user_agent"] = user_agent
|
||||
xiaohongshu_logger.info(f" [-] 无头模式设置: 1920x1080")
|
||||
xiaohongshu_logger.info(f" [-] 使用用户代理: {user_agent[:50]}...")
|
||||
else:
|
||||
# 有头模式使用较小的窗口
|
||||
context_options["viewport"] = {"width": 1600, "height": 900}
|
||||
xiaohongshu_logger.info(f" [-] 有头模式设置: 1600x900")
|
||||
|
||||
context = await browser.new_context(**context_options)
|
||||
context = await set_init_script(context)
|
||||
|
||||
# 创建一个新的页面
|
||||
page = await context.new_page()
|
||||
|
||||
# 🔧 创建人类化输入包装器(关键修复)
|
||||
human_typer = create_human_typer(page)
|
||||
xiaohongshu_logger.info(" [-] 已创建人类化输入包装器")
|
||||
|
||||
# 直接访问小红书图文发布页面
|
||||
await page.goto("https://creator.xiaohongshu.com/publish/publish?from=tab_switch&target=image")
|
||||
xiaohongshu_logger.info(f'[+]正在上传图文-------{self.title}')
|
||||
|
||||
# 等待页面加载
|
||||
xiaohongshu_logger.info(f'[-] 正在打开图文发布页面...')
|
||||
await page.wait_for_url("https://creator.xiaohongshu.com/publish/publish*")
|
||||
|
||||
# 上传图片
|
||||
await self.upload_images(page)
|
||||
|
||||
# 填充内容(传递人类化输入包装器)
|
||||
await self.fill_content(page, human_typer)
|
||||
|
||||
# 设置位置(如果有指定地点)
|
||||
if self.location and self.location.strip():
|
||||
xiaohongshu_logger.info(f" [-] 开始设置地理位置: {self.location}")
|
||||
await self.set_location(page, self.location)
|
||||
else:
|
||||
xiaohongshu_logger.info(" [-] 未指定地点或地点为空,跳过位置设置")
|
||||
|
||||
# 设置定时发布(如果需要)
|
||||
if self.publish_date != 0:
|
||||
await self.set_schedule_time_xiaohongshu(page, self.publish_date)
|
||||
|
||||
# 发布图文(增强反检测等待策略)
|
||||
xiaohongshu_logger.info(" [-] 准备发布图文...")
|
||||
await asyncio.sleep(1) # 发布前等待
|
||||
|
||||
while True:
|
||||
try:
|
||||
# 等待并点击发布按钮
|
||||
if self.publish_date != 0:
|
||||
xiaohongshu_logger.info(" [-] 点击定时发布按钮...")
|
||||
await page.locator('button:has-text("定时发布")').click()
|
||||
else:
|
||||
xiaohongshu_logger.info(" [-] 点击发布按钮...")
|
||||
await page.locator('button:has-text("发布")').click()
|
||||
|
||||
# 增加发布后的等待时间
|
||||
await asyncio.sleep(1)
|
||||
|
||||
await page.wait_for_url(
|
||||
"https://creator.xiaohongshu.com/publish/success?**",
|
||||
timeout=5000 # 增加超时时间到5秒
|
||||
)
|
||||
xiaohongshu_logger.success(" [-]图文发布成功")
|
||||
break
|
||||
except Exception as e:
|
||||
xiaohongshu_logger.info(" [-] 图文正在发布中...")
|
||||
xiaohongshu_logger.debug(f" [-] 等待详情: {str(e)}")
|
||||
await page.screenshot(full_page=True)
|
||||
# 使用随机等待时间,模拟人类行为
|
||||
import random
|
||||
wait_time = random.uniform(1.0, 2.0) # 1-2秒随机等待
|
||||
await asyncio.sleep(wait_time)
|
||||
|
||||
# 保存cookie并关闭浏览器
|
||||
await context.storage_state(path=self.account_file)
|
||||
xiaohongshu_logger.success(' [-]cookie更新完毕!')
|
||||
await asyncio.sleep(2)
|
||||
await context.close()
|
||||
await browser.close()
|
||||
|
||||
async def main(self):
|
||||
async with async_playwright() as playwright:
|
||||
await self.upload(playwright)
|
||||
232
utils/enhanced_human_typing.py
Normal file
232
utils/enhanced_human_typing.py
Normal file
@ -0,0 +1,232 @@
|
||||
import time
|
||||
import random
|
||||
import re
|
||||
import asyncio
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
class EnhancedHumanTypingSimulator:
|
||||
def __init__(self, page=None):
|
||||
# 保留原方案的简单配置
|
||||
self.base_config = {
|
||||
'min_typing_speed': 5,
|
||||
'max_typing_speed': 15,
|
||||
'pause_probability': 0.1,
|
||||
'chunk_input': True,
|
||||
'max_chunk_length': 50
|
||||
}
|
||||
|
||||
# 新增高级特性配置
|
||||
self.advanced_config = {
|
||||
# 人类状态模拟
|
||||
'energy_level': random.uniform(0.7, 1.0),
|
||||
'typing_proficiency': random.uniform(0.6, 0.9),
|
||||
'emotion_state': random.uniform(0.8, 1.0),
|
||||
|
||||
# 错误处理
|
||||
'base_error_rate': random.uniform(0.02, 0.05),
|
||||
'error_correction_speed': random.uniform(0.3, 0.8),
|
||||
|
||||
# 速度控制
|
||||
'speed_variance': random.uniform(0.1, 0.2),
|
||||
'burst_speed_probability': 0.1
|
||||
}
|
||||
|
||||
self.page = page
|
||||
self.typing_session = {
|
||||
'start_time': None,
|
||||
'chars_typed': 0,
|
||||
'last_break_time': time.time()
|
||||
}
|
||||
|
||||
async def type_text(self, text: str, selector: str = None) -> bool:
|
||||
"""增强版的文本输入方法"""
|
||||
try:
|
||||
if selector:
|
||||
# 等待并点击元素
|
||||
await self._prepare_input(selector)
|
||||
|
||||
# 初始化会话
|
||||
self.typing_session['start_time'] = time.time()
|
||||
|
||||
# 智能分段
|
||||
chunks = self._smart_split_text(text)
|
||||
|
||||
for chunk in chunks:
|
||||
# 获取当前状态
|
||||
current_state = self._get_current_state()
|
||||
|
||||
# 输入当前段落
|
||||
await self._type_chunk(chunk, current_state)
|
||||
|
||||
# 段落间自然停顿
|
||||
await self._natural_pause(current_state)
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"输入文本时出错: {e}")
|
||||
return False
|
||||
|
||||
def _smart_split_text(self, text: str) -> List[str]:
|
||||
"""智能文本分段"""
|
||||
paragraphs = text.split('\n')
|
||||
chunks = []
|
||||
|
||||
for para in paragraphs:
|
||||
if len(para) <= self.base_config['max_chunk_length']:
|
||||
if para.strip():
|
||||
chunks.append(para)
|
||||
continue
|
||||
|
||||
sentences = re.split(r'([。!?,:;])', para)
|
||||
current_chunk = ''
|
||||
|
||||
for sent in sentences:
|
||||
if len(current_chunk) + len(sent) < self.base_config['max_chunk_length']:
|
||||
current_chunk += sent
|
||||
else:
|
||||
if current_chunk.strip():
|
||||
chunks.append(current_chunk)
|
||||
current_chunk = sent
|
||||
|
||||
if current_chunk.strip():
|
||||
chunks.append(current_chunk)
|
||||
|
||||
return chunks
|
||||
|
||||
def _get_current_state(self) -> Dict:
|
||||
"""获取当前输入状态"""
|
||||
typing_duration = time.time() - self.typing_session['start_time']
|
||||
fatigue = min(typing_duration / 300, 0.7)
|
||||
|
||||
self.advanced_config['energy_level'] *= (1 - fatigue * 0.1)
|
||||
self.advanced_config['emotion_state'] *= random.uniform(0.98, 1.02)
|
||||
|
||||
return {
|
||||
'energy_level': max(0.3, self.advanced_config['energy_level']),
|
||||
'emotion_state': max(0.4, min(1.0, self.advanced_config['emotion_state'])),
|
||||
'typing_proficiency': self.advanced_config['typing_proficiency'],
|
||||
'current_error_rate': self._calculate_error_rate(fatigue)
|
||||
}
|
||||
|
||||
async def _type_chunk(self, chunk: str, state: Dict):
|
||||
"""输入文本块"""
|
||||
for char in chunk:
|
||||
typing_speed = self._calculate_typing_speed(state)
|
||||
|
||||
if random.random() < state['current_error_rate']:
|
||||
await self._handle_typing_error(char, state)
|
||||
else:
|
||||
await self._type_char(char, typing_speed)
|
||||
|
||||
self.typing_session['chars_typed'] += 1
|
||||
await self._micro_pause(state)
|
||||
|
||||
def _calculate_typing_speed(self, state: Dict) -> float:
|
||||
"""计算实时打字速度"""
|
||||
base_speed = random.uniform(
|
||||
self.base_config['min_typing_speed'],
|
||||
self.base_config['max_typing_speed']
|
||||
)
|
||||
|
||||
speed = base_speed * (
|
||||
0.7 + state['energy_level'] * 0.3 +
|
||||
state['emotion_state'] * 0.2 +
|
||||
state['typing_proficiency'] * 0.3
|
||||
)
|
||||
|
||||
speed *= random.uniform(
|
||||
1 - self.advanced_config['speed_variance'],
|
||||
1 + self.advanced_config['speed_variance']
|
||||
)
|
||||
|
||||
return speed
|
||||
|
||||
def _calculate_error_rate(self, fatigue: float) -> float:
|
||||
"""计算当前错误率"""
|
||||
base_rate = self.advanced_config['base_error_rate']
|
||||
error_rate = base_rate * (1 + fatigue)
|
||||
error_rate *= random.uniform(0.8, 1.2)
|
||||
return min(error_rate, 0.15)
|
||||
|
||||
async def _handle_typing_error(self, char: str, state: Dict):
|
||||
"""处理打字错误"""
|
||||
error_types = ['typo', 'double_hit', 'delay']
|
||||
error_type = random.choice(error_types)
|
||||
|
||||
if error_type == 'typo':
|
||||
wrong_char = self._get_similar_char(char)
|
||||
await self._type_char(wrong_char, self._calculate_typing_speed(state))
|
||||
await asyncio.sleep(random.uniform(0.2, 0.5))
|
||||
await self._press_key("Backspace")
|
||||
await self._type_char(char, self._calculate_typing_speed(state))
|
||||
|
||||
elif error_type == 'double_hit':
|
||||
await self._type_char(char, self._calculate_typing_speed(state))
|
||||
await self._type_char(char, self._calculate_typing_speed(state))
|
||||
await asyncio.sleep(random.uniform(0.1, 0.3))
|
||||
await self._press_key("Backspace")
|
||||
|
||||
else: # delay
|
||||
await asyncio.sleep(random.uniform(0.3, 0.8))
|
||||
await self._type_char(char, self._calculate_typing_speed(state))
|
||||
|
||||
async def _natural_pause(self, state: Dict):
|
||||
"""自然停顿"""
|
||||
base_pause = random.uniform(0.5, 1.5)
|
||||
|
||||
if state['energy_level'] < 0.5:
|
||||
base_pause *= 1.3
|
||||
if state['emotion_state'] < 0.6:
|
||||
base_pause *= 1.2
|
||||
|
||||
await asyncio.sleep(base_pause * random.uniform(0.8, 1.2))
|
||||
|
||||
async def _micro_pause(self, state: Dict):
|
||||
"""字符间的微小停顿"""
|
||||
pause_time = random.uniform(0.05, 0.15)
|
||||
if state['energy_level'] < 0.5:
|
||||
pause_time *= 1.2
|
||||
await asyncio.sleep(pause_time)
|
||||
|
||||
def _get_similar_char(self, char: str) -> str:
|
||||
"""获取相似字符"""
|
||||
similar_chars = {
|
||||
'的': '地得',
|
||||
'了': '着啦',
|
||||
'和': '与跟',
|
||||
'我': '我我',
|
||||
'是': '市师',
|
||||
'在': '再在',
|
||||
'有': '又有',
|
||||
'都': '都读',
|
||||
'好': '号毫'
|
||||
}
|
||||
return random.choice(similar_chars.get(char, char + char))
|
||||
|
||||
async def _prepare_input(self, selector: str):
|
||||
"""准备输入"""
|
||||
try:
|
||||
await self.page.wait_for_selector(selector, timeout=5000)
|
||||
await self.page.click(selector)
|
||||
await asyncio.sleep(random.uniform(0.3, 0.8))
|
||||
except Exception as e:
|
||||
print(f"准备输入失败: {e}")
|
||||
raise
|
||||
|
||||
async def _type_char(self, char: str, speed: float):
|
||||
"""输入单个字符"""
|
||||
try:
|
||||
delay = 1000 / speed # 转换为毫秒
|
||||
await self.page.keyboard.type(char, delay=delay)
|
||||
except Exception as e:
|
||||
print(f"输入字符失败: {e}")
|
||||
raise
|
||||
|
||||
async def _press_key(self, key: str):
|
||||
"""按键操作"""
|
||||
try:
|
||||
await self.page.keyboard.press(key)
|
||||
except Exception as e:
|
||||
print(f"按键操作失败: {e}")
|
||||
raise
|
||||
Loading…
x
Reference in New Issue
Block a user