增加了海报上传,修复了定时发布

This commit is contained in:
jinye_huang 2025-06-10 15:42:05 +08:00
parent a03cbf3bac
commit d30ce36fae

View File

@ -5,6 +5,7 @@
import json
import time
import random
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
from selenium import webdriver
@ -13,6 +14,8 @@ from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.action_chains import ActionChains
from selenium.common.exceptions import TimeoutException, NoSuchElementException
import os
import base64
class DouyinUtils:
@ -110,18 +113,18 @@ class VideoUploader:
# 可以通过检查进度条、上传状态等来判断是否完成
# 这里使用简单的时间等待,实际可以更智能
time.sleep(10) # 基础等待时间
## TODO: 这个要重新做一下
# 尝试检查是否有上传进度指示器
for i in range(timeout // 5):
try:
# 检查是否有上传完成的标志
progress_element = self.driver.find_element(By.XPATH, "//div[contains(@class, 'progress') or contains(@class, 'upload')]")
if "100%" in progress_element.text or "完成" in progress_element.text:
return True
except NoSuchElementException:
pass
# for i in range(timeout // 5):
# try:
# # 检查是否有上传完成的标志
# progress_element = self.driver.find_element(By.XPATH, "//div[contains(@class, 'progress') or contains(@class, 'upload')]")
# if "100%" in progress_element.text or "完成" in progress_element.text:
# return True
# except NoSuchElementException:
# pass
time.sleep(5)
# time.sleep(5)
return True # 如果没有找到进度指示器,假设上传完成
@ -189,6 +192,7 @@ class ProductLinker:
return False
print("团购商品选择完成")
time.sleep(random.random()*1.5)
return True
except Exception as e:
@ -261,6 +265,238 @@ class ProductLinker:
confirm_locator = (By.XPATH, "//div[@class='footer-button-DL8zDh']")
return self.utils.safe_click(self.driver, confirm_locator, use_js=True)
class PostManager:
"""海报设定模块"""
def __init__(self, driver):
self.driver = driver
self.utils = DouyinUtils()
def _click_poster_setting(self) -> bool:
"""点击海报设定"""
try:
# 尝试多个可能的海报设置按钮选择器
poster_selectors = [
(By.XPATH, "//div[@class='background-OpVteV filter-k_CjvJ']"),
# (By.XPATH, "//div[contains(@class, 'background') and contains(@class, 'filter')]"),
# (By.XPATH, "//div[contains(text(), '设置封面') or contains(text(), '海报')]"),
# (By.CSS_SELECTOR, ".background-OpVteV.filter-k_CjvJ")
]
for selector in poster_selectors:
if self.utils.safe_click(self.driver, selector, use_js=True):
print("海报设置按钮点击成功")
time.sleep(1) # 等待面板打开
return True
print("未找到海报设置按钮")
return False
except Exception as e:
print(f"点击海报设置按钮失败: {e}")
return False
def _upload_poster_file(self, poster_path: str) -> bool:
"""上传海报文件"""
try:
print(f"开始上传海报文件: {poster_path}")
# 检查文件是否存在
if not os.path.exists(poster_path):
print(f"海报文件不存在: {poster_path}")
return False
# 方法1: 使用send_keys方式首选
if self._upload_via_send_keys(poster_path):
print("使用send_keys方式上传成功")
return True
# 方法2: 使用base64编码方式备选
print("send_keys方式失败尝试base64方式...")
if self._upload_via_base64(poster_path):
print("使用base64方式上传成功")
return True
print("所有上传方式都失败了")
return False
except Exception as e:
print(f"上传海报文件失败: {e}")
return False
def _upload_via_send_keys(self, poster_path: str) -> bool:
"""使用send_keys方式上传文件"""
try:
# 查找隐藏的文件上传输入框,模仿视频上传的方式
file_input_selectors = [
# 海报上传的隐藏input元素主要选择器
(By.XPATH, "//input[@class='semi-upload-hidden-input' and @type='file']"),
(By.XPATH, "//input[contains(@class, 'semi-upload-hidden-input')]"),
# 通用的隐藏文件输入框
(By.XPATH, "//input[@type='file' and contains(@accept, 'image')]"),
(By.XPATH, "//input[@type='file' and @style='display: none;']"),
# 备选方案
(By.XPATH, "//div[contains(@class, 'semi-upload')]//input[@type='file']"),
(By.CSS_SELECTOR, "input[type='file'].semi-upload-hidden-input")
]
file_input = None
for selector in file_input_selectors:
file_input = self.utils.wait_for_element(self.driver, selector, timeout=5)
if file_input:
print(f"找到文件输入框: {selector}")
break
if file_input:
# 直接发送文件路径(模仿视频上传方式)
file_input.send_keys(poster_path)
print("海报文件已通过send_keys发送到输入框")
# 等待上传完成
time.sleep(3)
return True
else:
print("未找到文件上传输入框")
return False
except Exception as e:
print(f"send_keys上传失败: {e}")
return False
def _upload_via_base64(self, poster_path: str) -> bool:
"""使用base64编码方式上传文件"""
try:
# 读取文件并转换为base64
with open(poster_path, 'rb') as f:
file_content = f.read()
base64_content = base64.b64encode(file_content).decode('utf-8')
# 获取文件类型
file_extension = poster_path.lower().split('.')[-1]
mime_types = {
'jpg': 'image/jpeg',
'jpeg': 'image/jpeg',
'png': 'image/png',
'gif': 'image/gif',
'webp': 'image/webp'
}
mime_type = mime_types.get(file_extension, 'image/jpeg')
# 构造data URL
data_url = f"data:{mime_type};base64,{base64_content}"
# 尝试通过JavaScript上传
upload_success = self.driver.execute_script("""
// 查找上传区域
const uploadArea = document.querySelector('.semi-upload-drag-area') ||
document.querySelector('[class*="upload"]') ||
document.querySelector('input[type="file"]');
if (!uploadArea) {
return false;
}
// 创建文件对象
function dataURLtoFile(dataurl, filename) {
const arr = dataurl.split(',');
const mime = arr[0].match(/:(.*?);/)[1];
const bstr = atob(arr[1]);
let n = bstr.length;
const u8arr = new Uint8Array(n);
while(n--) {
u8arr[n] = bstr.charCodeAt(n);
}
return new File([u8arr], filename, {type: mime});
}
// 转换为File对象
const file = dataURLtoFile(arguments[0], arguments[1]);
// 创建FileList
const dt = new DataTransfer();
dt.items.add(file);
// 查找input[type="file"]元素
const fileInput = document.querySelector('input[type="file"]');
if (fileInput) {
fileInput.files = dt.files;
// 触发change事件
const event = new Event('change', { bubbles: true });
fileInput.dispatchEvent(event);
return true;
}
return false;
""", data_url, os.path.basename(poster_path))
if upload_success:
print("base64上传触发成功")
time.sleep(3) # 等待上传处理
return True
else:
print("base64上传触发失败")
return False
except Exception as e:
print(f"base64上传失败: {e}")
return False
def _confirm_poster_setting(self) -> bool:
"""确认海报设定"""
try:
# 等待一段时间让上传完成
time.sleep(2)
# 尝试多个可能的确认按钮选择器
confirm_selectors = [
(By.XPATH, "//button[@class='semi-button semi-button-primary semi-button-light primary-RstHX_']"),
(By.XPATH, "//button[contains(@class, 'semi-button-primary') and (contains(text(), '确定') or contains(text(), '确认') or contains(text(), '保存'))]"),
(By.XPATH, "//button[contains(text(), '确定') or contains(text(), '确认') or contains(text(), '保存')]"),
(By.CSS_SELECTOR, ".semi-button-primary")
]
for selector in confirm_selectors:
if self.utils.safe_click(self.driver, selector, use_js=True):
print("海报设置确认成功")
time.sleep(1)
return True
print("未找到确认按钮,可能已自动保存")
return True # 有些情况下可能自动保存,不需要确认按钮
except Exception as e:
print(f"确认海报设定失败: {e}")
return False
def set_poster(self, poster_path: str) -> bool:
"""设置海报的完整流程"""
try:
print(f"开始设置海报: {poster_path}")
# 1. 点击海报设置按钮
if not self._click_poster_setting():
print("点击海报设置按钮失败")
return False
# 2. 上传海报文件
if not self._upload_poster_file(poster_path):
print("上传海报文件失败")
return False
# 3. 确认设置
if not self._confirm_poster_setting():
print("确认海报设置失败")
return False
print("海报设置完成")
time.sleep(random.random() * 1.5)
return True
except Exception as e:
print(f"设置海报失败: {e}")
return False
class ScheduleManager:
"""定时发布管理模块"""
@ -280,13 +516,29 @@ class ScheduleManager:
# 1. 选择定时发布选项
if not self._select_schedule_option():
print("选择定时发布选项失败")
return False
# 2. 输入时间
# 2. 等待定时发布界面加载完成
if not self._wait_for_schedule_interface():
print("等待定时发布界面超时")
return False
# 3. 输入时间
if not self._input_schedule_time(schedule_time):
print("输入定时时间失败")
return False
# 4. 验证时间是否设置正确
if not self._verify_schedule_time(schedule_time):
print("时间验证失败,尝试重新设置")
# 重试一次
if not self._input_schedule_time(schedule_time):
print("重试输入时间失败")
return False
print("定时发布设置完成")
time.sleep(random.random() * 1.5)
return True
except Exception as e:
@ -295,39 +547,158 @@ class ScheduleManager:
def _select_schedule_option(self) -> bool:
"""选择定时发布选项"""
schedule_locator = (By.XPATH, "//label[contains(@class, 'radio-d4zkru')]//span[contains(text(), '定时发布')]/ancestor::label")
return self.utils.safe_click(self.driver, schedule_locator, use_js=True)
try:
# 尝试多个可能的定时发布选择器
schedule_selectors = [
(By.XPATH, "//label[contains(@class, 'radio-d4zkru')]//span[contains(text(), '定时发布')]/ancestor::label"),
(By.XPATH, "//label[contains(@class, 'radio')]//span[contains(text(), '定时发布')]/ancestor::label"),
(By.XPATH, "//span[contains(text(), '定时发布')]/ancestor::label"),
(By.XPATH, "//input[@type='radio' and @value='schedule']/../label")
]
for selector in schedule_selectors:
if self.utils.safe_click(self.driver, selector, use_js=True):
print("成功选择定时发布选项")
time.sleep(2) # 增加等待时间,让界面完全加载
return True
print("未找到定时发布选项")
return False
except Exception as e:
print(f"选择定时发布选项失败: {e}")
return False
def _wait_for_schedule_interface(self) -> bool:
"""等待定时发布界面完全加载"""
try:
print("等待定时发布界面加载...")
# 等待时间输入框出现
time_input_selectors = [
(By.XPATH, "//input[@class='semi-input semi-input-default' and @type='text' and @placeholder='日期和时间']"),
(By.XPATH, "//input[contains(@placeholder, '日期和时间')]"),
(By.XPATH, "//input[contains(@placeholder, '选择日期')]"),
(By.CSS_SELECTOR, "input[placeholder*='日期']")
]
for selector in time_input_selectors:
element = self.utils.wait_for_element(self.driver, selector, timeout=10)
if element:
print("定时发布界面加载完成")
time.sleep(1) # 额外等待确保界面稳定
return True
print("定时发布界面加载超时")
return False
except Exception as e:
print(f"等待定时发布界面失败: {e}")
return False
def _input_schedule_time(self, time_str: str) -> bool:
"""输入定时发布时间"""
try:
time_input = self.utils.wait_for_element(
self.driver,
(By.XPATH, "//input[@class='semi-input semi-input-default' and @type='text' and @placeholder='日期和时间']")
)
print(f"开始输入时间: {time_str}")
# 查找时间输入框
time_input_selectors = [
(By.XPATH, "//input[@class='semi-input semi-input-default' and @type='text' and @placeholder='日期和时间']"),
(By.XPATH, "//input[contains(@placeholder, '日期和时间')]"),
(By.XPATH, "//input[contains(@placeholder, '选择日期')]")
]
time_input = None
for selector in time_input_selectors:
time_input = self.utils.wait_for_element(self.driver, selector, timeout=5)
if time_input:
# 使用JavaScript设置时间值确保触发相关事件
print(f"找到时间输入框: {selector}")
break
if not time_input:
print("未找到时间输入框")
return False
# 清空输入框
time_input.clear()
time.sleep(0.5)
# 方法1: 直接输入
try:
time_input.send_keys(time_str)
time.sleep(1)
print("使用send_keys方法输入时间")
except Exception as e:
print(f"send_keys输入失败: {e}")
# 方法2: 使用JavaScript设置值作为备选
try:
self.driver.execute_script("""
const input = arguments[0];
const dateValue = arguments[1];
// 清空并设置值
input.value = '';
input.focus();
input.value = dateValue;
// 触发事件确保组件更新
const inputEvent = new Event('input', { bubbles: true });
const changeEvent = new Event('change', { bubbles: true });
input.dispatchEvent(inputEvent);
input.dispatchEvent(changeEvent);
// 触发多种事件确保组件更新
const events = ['input', 'change', 'blur', 'keyup'];
events.forEach(eventType => {
const event = new Event(eventType, { bubbles: true });
input.dispatchEvent(event);
});
// 模拟失去焦点
input.blur();
""", time_input, time_str)
time.sleep(1)
print("使用JavaScript方法设置时间")
except Exception as e:
print(f"JavaScript设置时间失败: {e}")
# 等待界面处理
time.sleep(2)
return True
return False
except Exception as e:
print(f"输入定时时间失败: {e}")
return False
def _verify_schedule_time(self, expected_time: str) -> bool:
"""验证设置的时间是否正确"""
try:
print(f"验证设置的时间: {expected_time}")
# 查找时间输入框并检查其值
time_input = self.utils.wait_for_element(
self.driver,
(By.XPATH, "//input[@class='semi-input semi-input-default' and @type='text' and @placeholder='日期和时间']"),
timeout=5
)
if time_input:
actual_value = time_input.get_attribute('value')
print(f"输入框实际值: {actual_value}")
print(f"期望值: {expected_time}")
# 简单的值比较(可能需要根据实际格式调整)
if actual_value and (expected_time in actual_value or actual_value in expected_time):
print("时间验证成功")
return True
else:
print(f"时间验证失败: 期望 {expected_time}, 实际 {actual_value}")
return False
else:
print("无法找到时间输入框进行验证")
return False
except Exception as e:
print(f"验证时间失败: {e}")
return False
class DouyinScheduler:
"""抖音发布调度器主类"""
@ -341,6 +712,7 @@ class DouyinScheduler:
self.content_editor = ContentEditor(self.driver)
self.product_linker = ProductLinker(self.driver)
self.schedule_manager = ScheduleManager(self.driver)
self.poster_manager = PostManager(self.driver)
self.utils = DouyinUtils()
# 初始化浏览器和登录
@ -373,7 +745,8 @@ class DouyinScheduler:
# 刷新页面
self.driver.refresh()
self.utils.smart_wait(self.driver, 5)
self.driver.get("https://creator.douyin.com/creator-micro/content/upload?enter_from=dou_web")
self.utils.smart_wait(self.driver, 5)
def _load_cookies(self):
"""加载cookies"""
try:
@ -399,6 +772,7 @@ class DouyinScheduler:
video_path: str,
title: str,
description: str = "",
poster_path: str = None,
product_name: str = None,
product_info: str = None,
schedule_time: str = None) -> bool:
@ -431,13 +805,19 @@ class DouyinScheduler:
print("商品挂载失败")
return False
# 4. 设置定时发布(如果提供)
# 4. 设置海报(如果提供)
if poster_path:
if not self.poster_manager.set_poster(poster_path):
print("海报设置失败")
return False
# 5. 设置定时发布(如果提供)
if schedule_time:
if not self.schedule_manager.set_schedule_time(schedule_time):
print("定时设置失败")
return False
# 5. 发布
# 6. 发布
return self._publish()
except Exception as e:
@ -448,6 +828,13 @@ class DouyinScheduler:
"""执行发布操作"""
try:
publish_locator = (By.XPATH, "//button[@class='button-dhlUZE primary-cECiOJ fixed-J9O8Yw']")
publish_button = self.utils.wait_for_element(self.driver, publish_locator)
if publish_button:
print("发布按钮已找到")
else:
print("发布按钮未找到")
return False
success = self.utils.safe_click(self.driver, publish_locator, use_js=True)
if success:
@ -508,26 +895,37 @@ class DouyinConfig:
if __name__ == "__main__":
# 使用示例
scheduler = DouyinScheduler("cookies.json", "safari")
scheduler = DouyinScheduler("cookies/cookies.json", "safari")
try:
# 发布视频示例
video_path = "/Users/yarrow/autoPublisher/video/gdsc/12575_1749192602.mp4"
title = "暑期遛娃宝藏地!科学中心解锁知识亲子趣时光~"
description = """
暑假想让娃远离电子产品
不想让娃在家 "葛优躺"
来广东科学中心亲子开启边玩边学模式
👉 点击左下角购票给娃一个 "科技含金量" 满满的暑假
📍 地址广州大学城科普路 168
🚇 地铁 4 号线大学城北站转 801 路直达
#广东科学中心 #暑期遛娃 #科技启蒙 #暑假去哪玩 #亲子时光
"""
product_name = "广东科学中心门票--亲子1大1小票"
product_info = "广东科学中心门票"
schedule_time = "2025-06-10 19:10"
poster_path = "/Users/yarrow/autoPublisher/video/gdsc/poster/1.png"
success = scheduler.publish_video_with_product(
video_path="/Users/yarrow/autoPublisher/video/gdsc/2011_1749089195_raw.mp4",
title="广州周末亲子游",
description="带孩子探索科学的奥秘 #广州亲子游 #科学中心",
product_name="广东科学中心门票--亲子1大1小票",
product_info="广东科学中心门票",
schedule_time="2025-06-05 12:30"
video_path=video_path,
title=title,
description=description,
poster_path=poster_path,
product_name=product_name,
product_info=product_info,
schedule_time=schedule_time
)
if success:
print("发布流程完成")
else:
print("发布流程失败")
# 等待用户确认
scheduler.wait_for_user_confirmation("按回车键退出...")
except Exception as e:
print(f"发布失败: {e}")
finally:
scheduler.close()