tiktokAutoPublisher/douyin_scheduler.py

931 lines
35 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
抖音发布调度器库
支持视频上传、内容编辑、商品挂载、定时发布等功能
"""
import json
import time
import random
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.action_chains import ActionChains
from selenium.common.exceptions import TimeoutException, NoSuchElementException
import os
import base64
class DouyinUtils:
"""工具类,提供通用的等待和操作方法"""
@staticmethod
def smart_wait(driver, timeout=10):
"""智能等待,根据页面加载状态动态调整等待时间"""
return WebDriverWait(driver, timeout)
@staticmethod
def safe_click(driver, element_locator, timeout=10, use_js=False):
"""安全点击元素支持JavaScript点击"""
try:
wait = WebDriverWait(driver, timeout)
element = wait.until(EC.element_to_be_clickable(element_locator))
if use_js:
driver.execute_script("arguments[0].click();", element)
else:
element.click()
return True
except TimeoutException:
print(f"元素点击超时: {element_locator}")
return False
@staticmethod
def safe_send_keys(driver, element_locator, text, timeout=10, clear_first=True):
"""安全输入文本"""
try:
wait = WebDriverWait(driver, timeout)
element = wait.until(EC.presence_of_element_located(element_locator))
if clear_first:
element.clear()
element.send_keys(text)
return True
except TimeoutException:
print(f"文本输入超时: {element_locator}")
return False
@staticmethod
def wait_for_element(driver, element_locator, timeout=10):
"""等待元素出现"""
try:
wait = WebDriverWait(driver, timeout)
return wait.until(EC.presence_of_element_located(element_locator))
except TimeoutException:
print(f"等待元素超时: {element_locator}")
return None
class VideoUploader:
"""视频上传模块"""
def __init__(self, driver):
self.driver = driver
self.utils = DouyinUtils()
def upload_video(self, video_path: str) -> bool:
"""上传视频文件"""
try:
print(f"开始上传视频: {video_path}")
# 查找隐藏的文件输入框
file_input = self.utils.wait_for_element(
self.driver,
(By.XPATH, "//input[@style='display: none;' or @type='file']"),
timeout=15
)
if file_input:
file_input.send_keys(video_path)
print("视频文件已发送到输入框")
# 等待上传完成的标志
upload_success = self._wait_for_upload_complete()
if upload_success:
print("视频上传完成")
return True
else:
print("视频上传可能未完成")
return False
else:
print("未找到文件输入框")
return False
except Exception as e:
print(f"视频上传失败: {e}")
return False
def _wait_for_upload_complete(self, timeout=60) -> bool:
"""等待视频上传完成"""
try:
# 可以通过检查进度条、上传状态等来判断是否完成
# 这里使用简单的时间等待,实际可以更智能
time.sleep(10) # 基础等待时间
## TODO: 这个要重新做一下
# 尝试检查是否有上传进度指示器
# for i in range(timeout // 5):
# try:
# # 检查是否有上传完成的标志
# progress_element = self.driver.find_element(By.XPATH, "//div[contains(@class, 'progress') or contains(@class, 'upload')]")
# if "100%" in progress_element.text or "完成" in progress_element.text:
# return True
# except NoSuchElementException:
# pass
# time.sleep(5)
return True # 如果没有找到进度指示器,假设上传完成
except Exception as e:
print(f"等待上传完成时出错: {e}")
return True
class ContentEditor:
"""内容编辑模块"""
def __init__(self, driver):
self.driver = driver
self.utils = DouyinUtils()
def set_title(self, title: str) -> bool:
"""设置视频标题"""
title_locator = (By.XPATH, "//input[@placeholder='填写作品标题,为作品获得更多流量']")
success = self.utils.safe_send_keys(self.driver, title_locator, title)
if success:
print(f"标题设置成功: {title}")
return success
def set_description(self, description: str) -> bool:
"""设置视频描述"""
desc_locator = (By.XPATH, "//div[@data-placeholder='添加作品简介']")
success = self.utils.safe_send_keys(self.driver, desc_locator, description)
if success:
print(f"描述设置成功: {description}")
return success
def set_content(self, title: str, description: str = "") -> bool:
"""设置标题和描述"""
title_success = self.set_title(title)
desc_success = self.set_description(description) if description else True
return title_success and desc_success
class ProductLinker:
"""商品挂载模块"""
def __init__(self, driver):
self.driver = driver
self.utils = DouyinUtils()
def select_group_buy(self, product_name: str, product_info: str) -> bool:
"""选择团购商品"""
try:
print(f"开始选择团购商品: {product_name}")
# 1. 点击下拉框选择团购类型
if not self._select_content_type():
return False
# 2. 选择具体商品
if not self._select_product(product_name):
return False
# 3. 输入商品信息
if not self._input_product_info(product_info):
return False
# 4. 确认选择
if not self._confirm_selection():
return False
print("团购商品选择完成")
time.sleep(random.random()*1.5)
return True
except Exception as e:
print(f"选择团购商品失败: {e}")
return False
def _select_content_type(self) -> bool:
"""选择内容类型为团购"""
# 点击下拉框
tab_selector_locator = (By.XPATH, "//div[@class='semi-select select-lJTtRL semi-select-single']")
if not self.utils.safe_click(self.driver, tab_selector_locator, use_js=True):
return False
time.sleep(1) # 等待下拉框展开
# 选择团购选项
group_buy_locator = (By.XPATH, "//div[@data-code='13010' and @class='select-dropdown-option-video']")
return self.utils.safe_click(self.driver, group_buy_locator, use_js=True)
def _select_product(self, product_name: str) -> bool:
"""选择具体商品"""
# 点击商品选择下拉框
product_selector_locator = (By.XPATH, "//div[@class='semi-select select-Qm4u8S semi-select-single semi-select-filterable']")
if not self.utils.safe_click(self.driver, product_selector_locator, use_js=True):
return False
time.sleep(1)
# 输入商品名称进行搜索
product_input_locator = (By.XPATH, "//input[@class='semi-input semi-input-default' and @placeholder='']")
if not self.utils.safe_send_keys(self.driver, product_input_locator, product_name, clear_first=False):
return False
time.sleep(2) # 等待搜索结果
# 选择匹配的商品
return self._click_matching_product_option(product_name)
def _click_matching_product_option(self, product_name: str, max_retries: int = 10) -> bool:
"""点击匹配的商品选项"""
for attempt in range(max_retries):
clicked = self.driver.execute_script("""
let elements = document.querySelectorAll('.semi-select-option');
for (let el of elements) {
if (el.textContent.includes(arguments[0])) {
el.click();
return true;
}
}
return false;
""", product_name)
if clicked:
print(f"成功选择商品,尝试次数: {attempt + 1}")
return True
else:
print(f"等待商品选项,尝试次数: {attempt + 1}/{max_retries}")
time.sleep(1)
print("未能找到匹配的商品选项")
return False
def _input_product_info(self, product_info: str) -> bool:
"""输入商品信息"""
product_info_locator = (By.XPATH, "//input[@class='semi-input semi-input-default' and contains(@placeholder, '如:')]")
return self.utils.safe_send_keys(self.driver, product_info_locator, product_info)
def _confirm_selection(self) -> bool:
"""确认商品选择"""
confirm_locator = (By.XPATH, "//div[@class='footer-button-DL8zDh']")
return self.utils.safe_click(self.driver, confirm_locator, use_js=True)
class PostManager:
"""海报设定模块"""
def __init__(self, driver):
self.driver = driver
self.utils = DouyinUtils()
def _click_poster_setting(self) -> bool:
"""点击海报设定"""
try:
# 尝试多个可能的海报设置按钮选择器
poster_selectors = [
(By.XPATH, "//div[@class='background-OpVteV filter-k_CjvJ']"),
# (By.XPATH, "//div[contains(@class, 'background') and contains(@class, 'filter')]"),
# (By.XPATH, "//div[contains(text(), '设置封面') or contains(text(), '海报')]"),
# (By.CSS_SELECTOR, ".background-OpVteV.filter-k_CjvJ")
]
for selector in poster_selectors:
if self.utils.safe_click(self.driver, selector, use_js=True):
print("海报设置按钮点击成功")
time.sleep(1) # 等待面板打开
return True
print("未找到海报设置按钮")
return False
except Exception as e:
print(f"点击海报设置按钮失败: {e}")
return False
def _upload_poster_file(self, poster_path: str) -> bool:
"""上传海报文件"""
try:
print(f"开始上传海报文件: {poster_path}")
# 检查文件是否存在
if not os.path.exists(poster_path):
print(f"海报文件不存在: {poster_path}")
return False
# 方法1: 使用send_keys方式首选
if self._upload_via_send_keys(poster_path):
print("使用send_keys方式上传成功")
return True
# 方法2: 使用base64编码方式备选
print("send_keys方式失败尝试base64方式...")
if self._upload_via_base64(poster_path):
print("使用base64方式上传成功")
return True
print("所有上传方式都失败了")
return False
except Exception as e:
print(f"上传海报文件失败: {e}")
return False
def _upload_via_send_keys(self, poster_path: str) -> bool:
"""使用send_keys方式上传文件"""
try:
# 查找隐藏的文件上传输入框,模仿视频上传的方式
file_input_selectors = [
# 海报上传的隐藏input元素主要选择器
(By.XPATH, "//input[@class='semi-upload-hidden-input' and @type='file']"),
(By.XPATH, "//input[contains(@class, 'semi-upload-hidden-input')]"),
# 通用的隐藏文件输入框
(By.XPATH, "//input[@type='file' and contains(@accept, 'image')]"),
(By.XPATH, "//input[@type='file' and @style='display: none;']"),
# 备选方案
(By.XPATH, "//div[contains(@class, 'semi-upload')]//input[@type='file']"),
(By.CSS_SELECTOR, "input[type='file'].semi-upload-hidden-input")
]
file_input = None
for selector in file_input_selectors:
file_input = self.utils.wait_for_element(self.driver, selector, timeout=5)
if file_input:
print(f"找到文件输入框: {selector}")
break
if file_input:
# 直接发送文件路径(模仿视频上传方式)
file_input.send_keys(poster_path)
print("海报文件已通过send_keys发送到输入框")
# 等待上传完成
time.sleep(3)
return True
else:
print("未找到文件上传输入框")
return False
except Exception as e:
print(f"send_keys上传失败: {e}")
return False
def _upload_via_base64(self, poster_path: str) -> bool:
"""使用base64编码方式上传文件"""
try:
# 读取文件并转换为base64
with open(poster_path, 'rb') as f:
file_content = f.read()
base64_content = base64.b64encode(file_content).decode('utf-8')
# 获取文件类型
file_extension = poster_path.lower().split('.')[-1]
mime_types = {
'jpg': 'image/jpeg',
'jpeg': 'image/jpeg',
'png': 'image/png',
'gif': 'image/gif',
'webp': 'image/webp'
}
mime_type = mime_types.get(file_extension, 'image/jpeg')
# 构造data URL
data_url = f"data:{mime_type};base64,{base64_content}"
# 尝试通过JavaScript上传
upload_success = self.driver.execute_script("""
// 查找上传区域
const uploadArea = document.querySelector('.semi-upload-drag-area') ||
document.querySelector('[class*="upload"]') ||
document.querySelector('input[type="file"]');
if (!uploadArea) {
return false;
}
// 创建文件对象
function dataURLtoFile(dataurl, filename) {
const arr = dataurl.split(',');
const mime = arr[0].match(/:(.*?);/)[1];
const bstr = atob(arr[1]);
let n = bstr.length;
const u8arr = new Uint8Array(n);
while(n--) {
u8arr[n] = bstr.charCodeAt(n);
}
return new File([u8arr], filename, {type: mime});
}
// 转换为File对象
const file = dataURLtoFile(arguments[0], arguments[1]);
// 创建FileList
const dt = new DataTransfer();
dt.items.add(file);
// 查找input[type="file"]元素
const fileInput = document.querySelector('input[type="file"]');
if (fileInput) {
fileInput.files = dt.files;
// 触发change事件
const event = new Event('change', { bubbles: true });
fileInput.dispatchEvent(event);
return true;
}
return false;
""", data_url, os.path.basename(poster_path))
if upload_success:
print("base64上传触发成功")
time.sleep(3) # 等待上传处理
return True
else:
print("base64上传触发失败")
return False
except Exception as e:
print(f"base64上传失败: {e}")
return False
def _confirm_poster_setting(self) -> bool:
"""确认海报设定"""
try:
# 等待一段时间让上传完成
time.sleep(2)
# 尝试多个可能的确认按钮选择器
confirm_selectors = [
(By.XPATH, "//button[@class='semi-button semi-button-primary semi-button-light primary-RstHX_']"),
(By.XPATH, "//button[contains(@class, 'semi-button-primary') and (contains(text(), '确定') or contains(text(), '确认') or contains(text(), '保存'))]"),
(By.XPATH, "//button[contains(text(), '确定') or contains(text(), '确认') or contains(text(), '保存')]"),
(By.CSS_SELECTOR, ".semi-button-primary")
]
for selector in confirm_selectors:
if self.utils.safe_click(self.driver, selector, use_js=True):
print("海报设置确认成功")
time.sleep(1)
return True
print("未找到确认按钮,可能已自动保存")
return True # 有些情况下可能自动保存,不需要确认按钮
except Exception as e:
print(f"确认海报设定失败: {e}")
return False
def set_poster(self, poster_path: str) -> bool:
"""设置海报的完整流程"""
try:
print(f"开始设置海报: {poster_path}")
# 1. 点击海报设置按钮
if not self._click_poster_setting():
print("点击海报设置按钮失败")
return False
# 2. 上传海报文件
if not self._upload_poster_file(poster_path):
print("上传海报文件失败")
return False
# 3. 确认设置
if not self._confirm_poster_setting():
print("确认海报设置失败")
return False
print("海报设置完成")
time.sleep(random.random() * 1.5)
return True
except Exception as e:
print(f"设置海报失败: {e}")
return False
class ScheduleManager:
"""定时发布管理模块"""
def __init__(self, driver):
self.driver = driver
self.utils = DouyinUtils()
def set_schedule_time(self, schedule_time: str) -> bool:
"""设置定时发布时间
Args:
schedule_time: 时间字符串,格式如 '2025-06-05 12:30'
"""
try:
print(f"设置定时发布: {schedule_time}")
# 1. 选择定时发布选项
if not self._select_schedule_option():
print("选择定时发布选项失败")
return False
# 2. 等待定时发布界面加载完成
if not self._wait_for_schedule_interface():
print("等待定时发布界面超时")
return False
# 3. 输入时间
if not self._input_schedule_time(schedule_time):
print("输入定时时间失败")
return False
# 4. 验证时间是否设置正确
if not self._verify_schedule_time(schedule_time):
print("时间验证失败,尝试重新设置")
# 重试一次
if not self._input_schedule_time(schedule_time):
print("重试输入时间失败")
return False
print("定时发布设置完成")
time.sleep(random.random() * 1.5)
return True
except Exception as e:
print(f"设置定时发布失败: {e}")
return False
def _select_schedule_option(self) -> bool:
"""选择定时发布选项"""
try:
# 尝试多个可能的定时发布选择器
schedule_selectors = [
(By.XPATH, "//label[contains(@class, 'radio-d4zkru')]//span[contains(text(), '定时发布')]/ancestor::label"),
(By.XPATH, "//label[contains(@class, 'radio')]//span[contains(text(), '定时发布')]/ancestor::label"),
(By.XPATH, "//span[contains(text(), '定时发布')]/ancestor::label"),
(By.XPATH, "//input[@type='radio' and @value='schedule']/../label")
]
for selector in schedule_selectors:
if self.utils.safe_click(self.driver, selector, use_js=True):
print("成功选择定时发布选项")
time.sleep(2) # 增加等待时间,让界面完全加载
return True
print("未找到定时发布选项")
return False
except Exception as e:
print(f"选择定时发布选项失败: {e}")
return False
def _wait_for_schedule_interface(self) -> bool:
"""等待定时发布界面完全加载"""
try:
print("等待定时发布界面加载...")
# 等待时间输入框出现
time_input_selectors = [
(By.XPATH, "//input[@class='semi-input semi-input-default' and @type='text' and @placeholder='日期和时间']"),
(By.XPATH, "//input[contains(@placeholder, '日期和时间')]"),
(By.XPATH, "//input[contains(@placeholder, '选择日期')]"),
(By.CSS_SELECTOR, "input[placeholder*='日期']")
]
for selector in time_input_selectors:
element = self.utils.wait_for_element(self.driver, selector, timeout=10)
if element:
print("定时发布界面加载完成")
time.sleep(1) # 额外等待确保界面稳定
return True
print("定时发布界面加载超时")
return False
except Exception as e:
print(f"等待定时发布界面失败: {e}")
return False
def _input_schedule_time(self, time_str: str) -> bool:
"""输入定时发布时间"""
try:
print(f"开始输入时间: {time_str}")
# 查找时间输入框
time_input_selectors = [
(By.XPATH, "//input[@class='semi-input semi-input-default' and @type='text' and @placeholder='日期和时间']"),
(By.XPATH, "//input[contains(@placeholder, '日期和时间')]"),
(By.XPATH, "//input[contains(@placeholder, '选择日期')]")
]
time_input = None
for selector in time_input_selectors:
time_input = self.utils.wait_for_element(self.driver, selector, timeout=5)
if time_input:
print(f"找到时间输入框: {selector}")
break
if not time_input:
print("未找到时间输入框")
return False
# 清空输入框
time_input.clear()
time.sleep(0.5)
# 方法1: 直接输入
try:
time_input.send_keys(time_str)
time.sleep(1)
print("使用send_keys方法输入时间")
except Exception as e:
print(f"send_keys输入失败: {e}")
# 方法2: 使用JavaScript设置值作为备选
try:
self.driver.execute_script("""
const input = arguments[0];
const dateValue = arguments[1];
// 清空并设置值
input.value = '';
input.focus();
input.value = dateValue;
// 触发多种事件确保组件更新
const events = ['input', 'change', 'blur', 'keyup'];
events.forEach(eventType => {
const event = new Event(eventType, { bubbles: true });
input.dispatchEvent(event);
});
// 模拟失去焦点
input.blur();
""", time_input, time_str)
time.sleep(1)
print("使用JavaScript方法设置时间")
except Exception as e:
print(f"JavaScript设置时间失败: {e}")
# 等待界面处理
time.sleep(2)
return True
except Exception as e:
print(f"输入定时时间失败: {e}")
return False
def _verify_schedule_time(self, expected_time: str) -> bool:
"""验证设置的时间是否正确"""
try:
print(f"验证设置的时间: {expected_time}")
# 查找时间输入框并检查其值
time_input = self.utils.wait_for_element(
self.driver,
(By.XPATH, "//input[@class='semi-input semi-input-default' and @type='text' and @placeholder='日期和时间']"),
timeout=5
)
if time_input:
actual_value = time_input.get_attribute('value')
print(f"输入框实际值: {actual_value}")
print(f"期望值: {expected_time}")
# 简单的值比较(可能需要根据实际格式调整)
if actual_value and (expected_time in actual_value or actual_value in expected_time):
print("时间验证成功")
return True
else:
print(f"时间验证失败: 期望 {expected_time}, 实际 {actual_value}")
return False
else:
print("无法找到时间输入框进行验证")
return False
except Exception as e:
print(f"验证时间失败: {e}")
return False
class DouyinScheduler:
"""抖音发布调度器主类"""
def __init__(self, cookies_path: str, browser_type: str = "safari"):
self.driver = self._init_driver(browser_type)
self.cookies_path = cookies_path
# 初始化各个模块
self.video_uploader = VideoUploader(self.driver)
self.content_editor = ContentEditor(self.driver)
self.product_linker = ProductLinker(self.driver)
self.schedule_manager = ScheduleManager(self.driver)
self.poster_manager = PostManager(self.driver)
self.utils = DouyinUtils()
# 初始化浏览器和登录
self._setup_browser()
def _init_driver(self, browser_type: str):
"""初始化浏览器驱动"""
if browser_type.lower() == "safari":
driver = webdriver.Safari()
elif browser_type.lower() == "chrome":
driver = webdriver.Chrome()
elif browser_type.lower() == "firefox":
driver = webdriver.Firefox()
else:
raise ValueError(f"不支持的浏览器类型: {browser_type}")
driver.maximize_window()
driver.set_page_load_timeout(30)
return driver
def _setup_browser(self):
"""设置浏览器并加载cookies"""
# 访问抖音创作页面
self.driver.get("https://creator.douyin.com/creator-micro/content/upload?enter_from=dou_web")
self.utils.smart_wait(self.driver, 5)
# 加载cookies
self._load_cookies()
# 刷新页面
self.driver.refresh()
self.utils.smart_wait(self.driver, 5)
self.driver.get("https://creator.douyin.com/creator-micro/content/upload?enter_from=dou_web")
self.utils.smart_wait(self.driver, 5)
def _load_cookies(self):
"""加载cookies"""
try:
with open(self.cookies_path, 'r') as f:
cookies = json.load(f)
for cookie in cookies:
# 移除可能导致问题的属性
for attr in ['sameSite', 'expiry']:
cookie.pop(attr, None)
try:
self.driver.add_cookie(cookie)
except Exception as e:
print(f"添加cookie失败: {e}")
print("Cookies加载完成")
except Exception as e:
print(f"加载cookies失败: {e}")
def publish_video_with_product(self,
video_path: str,
title: str,
description: str = "",
poster_path: str = None,
product_name: str = None,
product_info: str = None,
schedule_time: str = None) -> bool:
"""发布带商品的视频
Args:
video_path: 视频文件路径
title: 视频标题
description: 视频描述
product_name: 商品名称
product_info: 商品信息
schedule_time: 定时发布时间,格式如 '2025-06-05 12:30'
"""
try:
print("开始发布流程...")
# 1. 上传视频
if not self.video_uploader.upload_video(video_path):
print("视频上传失败")
return False
# 2. 设置内容
if not self.content_editor.set_content(title, description):
print("内容设置失败")
return False
# 3. 挂载商品(如果提供)
if product_name and product_info:
if not self.product_linker.select_group_buy(product_name, product_info):
print("商品挂载失败")
return False
# 4. 设置海报(如果提供)
if poster_path:
if not self.poster_manager.set_poster(poster_path):
print("海报设置失败")
return False
# 5. 设置定时发布(如果提供)
if schedule_time:
if not self.schedule_manager.set_schedule_time(schedule_time):
print("定时设置失败")
return False
# 6. 发布
return self._publish()
except Exception as e:
print(f"发布流程失败: {e}")
return False
def _publish(self) -> bool:
"""执行发布操作"""
try:
publish_locator = (By.XPATH, "//button[@class='button-dhlUZE primary-cECiOJ fixed-J9O8Yw']")
publish_button = self.utils.wait_for_element(self.driver, publish_locator)
if publish_button:
print("发布按钮已找到")
else:
print("发布按钮未找到")
return False
success = self.utils.safe_click(self.driver, publish_locator, use_js=True)
if success:
print("发布成功!")
return True
else:
print("点击发布按钮失败")
return False
except Exception as e:
print(f"发布失败: {e}")
return False
def wait_for_user_confirmation(self, message: str = "按回车键继续..."):
"""等待用户确认"""
input(message)
def close(self):
"""关闭浏览器"""
if self.driver:
self.driver.quit()
print("浏览器已关闭")
# 配置类
class DouyinConfig:
"""配置管理类"""
def __init__(self, config_file: str = None):
self.config = {}
if config_file:
self.load_config(config_file)
def load_config(self, config_file: str):
"""从文件加载配置"""
try:
with open(config_file, 'r', encoding='utf-8') as f:
self.config = json.load(f)
except Exception as e:
print(f"加载配置文件失败: {e}")
def get(self, key: str, default=None):
"""获取配置值"""
return self.config.get(key, default)
def set(self, key: str, value):
"""设置配置值"""
self.config[key] = value
def save_config(self, config_file: str):
"""保存配置到文件"""
try:
with open(config_file, 'w', encoding='utf-8') as f:
json.dump(self.config, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"保存配置文件失败: {e}")
if __name__ == "__main__":
# 使用示例
scheduler = DouyinScheduler("cookies/cookies.json", "safari")
try:
video_path = "/Users/yarrow/autoPublisher/video/gdsc/12575_1749192602.mp4"
title = "暑期遛娃宝藏地!科学中心解锁知识亲子趣时光~"
description = """
暑假想让娃远离电子产品?
不想让娃在家 "葛优躺"
来广东科学中心,亲子开启「边玩边学」模式!
👉 点击左下角购票,给娃一个 "科技含金量" 满满的暑假!
📍 地址:广州大学城科普路 168 号
🚇 地铁 4 号线大学城北站转 801 路直达
#广东科学中心 #暑期遛娃 #科技启蒙 #暑假去哪玩 #亲子时光
"""
product_name = "广东科学中心门票--亲子1大1小票"
product_info = "广东科学中心门票"
schedule_time = "2025-06-10 19:10"
poster_path = "/Users/yarrow/autoPublisher/video/gdsc/poster/1.png"
success = scheduler.publish_video_with_product(
video_path=video_path,
title=title,
description=description,
poster_path=poster_path,
product_name=product_name,
product_info=product_info,
schedule_time=schedule_time
)
# 等待用户确认
scheduler.wait_for_user_confirmation("按回车键退出...")
except Exception as e:
print(f"发布失败: {e}")
finally:
scheduler.close()