tiktokAutoPublisher/douyin_scheduler.py

533 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
抖音发布调度器库
支持视频上传、内容编辑、商品挂载、定时发布等功能
"""
import json
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.action_chains import ActionChains
from selenium.common.exceptions import TimeoutException, NoSuchElementException
class DouyinUtils:
"""工具类,提供通用的等待和操作方法"""
@staticmethod
def smart_wait(driver, timeout=10):
"""智能等待,根据页面加载状态动态调整等待时间"""
return WebDriverWait(driver, timeout)
@staticmethod
def safe_click(driver, element_locator, timeout=10, use_js=False):
"""安全点击元素支持JavaScript点击"""
try:
wait = WebDriverWait(driver, timeout)
element = wait.until(EC.element_to_be_clickable(element_locator))
if use_js:
driver.execute_script("arguments[0].click();", element)
else:
element.click()
return True
except TimeoutException:
print(f"元素点击超时: {element_locator}")
return False
@staticmethod
def safe_send_keys(driver, element_locator, text, timeout=10, clear_first=True):
"""安全输入文本"""
try:
wait = WebDriverWait(driver, timeout)
element = wait.until(EC.presence_of_element_located(element_locator))
if clear_first:
element.clear()
element.send_keys(text)
return True
except TimeoutException:
print(f"文本输入超时: {element_locator}")
return False
@staticmethod
def wait_for_element(driver, element_locator, timeout=10):
"""等待元素出现"""
try:
wait = WebDriverWait(driver, timeout)
return wait.until(EC.presence_of_element_located(element_locator))
except TimeoutException:
print(f"等待元素超时: {element_locator}")
return None
class VideoUploader:
"""视频上传模块"""
def __init__(self, driver):
self.driver = driver
self.utils = DouyinUtils()
def upload_video(self, video_path: str) -> bool:
"""上传视频文件"""
try:
print(f"开始上传视频: {video_path}")
# 查找隐藏的文件输入框
file_input = self.utils.wait_for_element(
self.driver,
(By.XPATH, "//input[@style='display: none;' or @type='file']"),
timeout=15
)
if file_input:
file_input.send_keys(video_path)
print("视频文件已发送到输入框")
# 等待上传完成的标志
upload_success = self._wait_for_upload_complete()
if upload_success:
print("视频上传完成")
return True
else:
print("视频上传可能未完成")
return False
else:
print("未找到文件输入框")
return False
except Exception as e:
print(f"视频上传失败: {e}")
return False
def _wait_for_upload_complete(self, timeout=60) -> bool:
"""等待视频上传完成"""
try:
# 可以通过检查进度条、上传状态等来判断是否完成
# 这里使用简单的时间等待,实际可以更智能
time.sleep(10) # 基础等待时间
# 尝试检查是否有上传进度指示器
for i in range(timeout // 5):
try:
# 检查是否有上传完成的标志
progress_element = self.driver.find_element(By.XPATH, "//div[contains(@class, 'progress') or contains(@class, 'upload')]")
if "100%" in progress_element.text or "完成" in progress_element.text:
return True
except NoSuchElementException:
pass
time.sleep(5)
return True # 如果没有找到进度指示器,假设上传完成
except Exception as e:
print(f"等待上传完成时出错: {e}")
return True
class ContentEditor:
"""内容编辑模块"""
def __init__(self, driver):
self.driver = driver
self.utils = DouyinUtils()
def set_title(self, title: str) -> bool:
"""设置视频标题"""
title_locator = (By.XPATH, "//input[@placeholder='填写作品标题,为作品获得更多流量']")
success = self.utils.safe_send_keys(self.driver, title_locator, title)
if success:
print(f"标题设置成功: {title}")
return success
def set_description(self, description: str) -> bool:
"""设置视频描述"""
desc_locator = (By.XPATH, "//div[@data-placeholder='添加作品简介']")
success = self.utils.safe_send_keys(self.driver, desc_locator, description)
if success:
print(f"描述设置成功: {description}")
return success
def set_content(self, title: str, description: str = "") -> bool:
"""设置标题和描述"""
title_success = self.set_title(title)
desc_success = self.set_description(description) if description else True
return title_success and desc_success
class ProductLinker:
"""商品挂载模块"""
def __init__(self, driver):
self.driver = driver
self.utils = DouyinUtils()
def select_group_buy(self, product_name: str, product_info: str) -> bool:
"""选择团购商品"""
try:
print(f"开始选择团购商品: {product_name}")
# 1. 点击下拉框选择团购类型
if not self._select_content_type():
return False
# 2. 选择具体商品
if not self._select_product(product_name):
return False
# 3. 输入商品信息
if not self._input_product_info(product_info):
return False
# 4. 确认选择
if not self._confirm_selection():
return False
print("团购商品选择完成")
return True
except Exception as e:
print(f"选择团购商品失败: {e}")
return False
def _select_content_type(self) -> bool:
"""选择内容类型为团购"""
# 点击下拉框
tab_selector_locator = (By.XPATH, "//div[@class='semi-select select-lJTtRL semi-select-single']")
if not self.utils.safe_click(self.driver, tab_selector_locator, use_js=True):
return False
time.sleep(1) # 等待下拉框展开
# 选择团购选项
group_buy_locator = (By.XPATH, "//div[@data-code='13010' and @class='select-dropdown-option-video']")
return self.utils.safe_click(self.driver, group_buy_locator, use_js=True)
def _select_product(self, product_name: str) -> bool:
"""选择具体商品"""
# 点击商品选择下拉框
product_selector_locator = (By.XPATH, "//div[@class='semi-select select-Qm4u8S semi-select-single semi-select-filterable']")
if not self.utils.safe_click(self.driver, product_selector_locator, use_js=True):
return False
time.sleep(1)
# 输入商品名称进行搜索
product_input_locator = (By.XPATH, "//input[@class='semi-input semi-input-default' and @placeholder='']")
if not self.utils.safe_send_keys(self.driver, product_input_locator, product_name, clear_first=False):
return False
time.sleep(2) # 等待搜索结果
# 选择匹配的商品
return self._click_matching_product_option(product_name)
def _click_matching_product_option(self, product_name: str, max_retries: int = 10) -> bool:
"""点击匹配的商品选项"""
for attempt in range(max_retries):
clicked = self.driver.execute_script("""
let elements = document.querySelectorAll('.semi-select-option');
for (let el of elements) {
if (el.textContent.includes(arguments[0])) {
el.click();
return true;
}
}
return false;
""", product_name)
if clicked:
print(f"成功选择商品,尝试次数: {attempt + 1}")
return True
else:
print(f"等待商品选项,尝试次数: {attempt + 1}/{max_retries}")
time.sleep(1)
print("未能找到匹配的商品选项")
return False
def _input_product_info(self, product_info: str) -> bool:
"""输入商品信息"""
product_info_locator = (By.XPATH, "//input[@class='semi-input semi-input-default' and contains(@placeholder, '如:')]")
return self.utils.safe_send_keys(self.driver, product_info_locator, product_info)
def _confirm_selection(self) -> bool:
"""确认商品选择"""
confirm_locator = (By.XPATH, "//div[@class='footer-button-DL8zDh']")
return self.utils.safe_click(self.driver, confirm_locator, use_js=True)
class ScheduleManager:
"""定时发布管理模块"""
def __init__(self, driver):
self.driver = driver
self.utils = DouyinUtils()
def set_schedule_time(self, schedule_time: str) -> bool:
"""设置定时发布时间
Args:
schedule_time: 时间字符串,格式如 '2025-06-05 12:30'
"""
try:
print(f"设置定时发布: {schedule_time}")
# 1. 选择定时发布选项
if not self._select_schedule_option():
return False
# 2. 输入时间
if not self._input_schedule_time(schedule_time):
return False
print("定时发布设置完成")
return True
except Exception as e:
print(f"设置定时发布失败: {e}")
return False
def _select_schedule_option(self) -> bool:
"""选择定时发布选项"""
schedule_locator = (By.XPATH, "//label[contains(@class, 'radio-d4zkru')]//span[contains(text(), '定时发布')]/ancestor::label")
return self.utils.safe_click(self.driver, schedule_locator, use_js=True)
def _input_schedule_time(self, time_str: str) -> bool:
"""输入定时发布时间"""
try:
time_input = self.utils.wait_for_element(
self.driver,
(By.XPATH, "//input[@class='semi-input semi-input-default' and @type='text' and @placeholder='日期和时间']")
)
if time_input:
# 使用JavaScript设置时间值确保触发相关事件
self.driver.execute_script("""
const input = arguments[0];
const dateValue = arguments[1];
input.value = dateValue;
// 触发事件确保组件更新
const inputEvent = new Event('input', { bubbles: true });
const changeEvent = new Event('change', { bubbles: true });
input.dispatchEvent(inputEvent);
input.dispatchEvent(changeEvent);
""", time_input, time_str)
return True
return False
except Exception as e:
print(f"输入定时时间失败: {e}")
return False
class DouyinScheduler:
"""抖音发布调度器主类"""
def __init__(self, cookies_path: str, browser_type: str = "safari"):
self.driver = self._init_driver(browser_type)
self.cookies_path = cookies_path
# 初始化各个模块
self.video_uploader = VideoUploader(self.driver)
self.content_editor = ContentEditor(self.driver)
self.product_linker = ProductLinker(self.driver)
self.schedule_manager = ScheduleManager(self.driver)
self.utils = DouyinUtils()
# 初始化浏览器和登录
self._setup_browser()
def _init_driver(self, browser_type: str):
"""初始化浏览器驱动"""
if browser_type.lower() == "safari":
driver = webdriver.Safari()
elif browser_type.lower() == "chrome":
driver = webdriver.Chrome()
elif browser_type.lower() == "firefox":
driver = webdriver.Firefox()
else:
raise ValueError(f"不支持的浏览器类型: {browser_type}")
driver.maximize_window()
driver.set_page_load_timeout(30)
return driver
def _setup_browser(self):
"""设置浏览器并加载cookies"""
# 访问抖音创作页面
self.driver.get("https://creator.douyin.com/creator-micro/content/upload?enter_from=dou_web")
self.utils.smart_wait(self.driver, 5)
# 加载cookies
self._load_cookies()
# 刷新页面
self.driver.refresh()
self.utils.smart_wait(self.driver, 5)
def _load_cookies(self):
"""加载cookies"""
try:
with open(self.cookies_path, 'r') as f:
cookies = json.load(f)
for cookie in cookies:
# 移除可能导致问题的属性
for attr in ['sameSite', 'expiry']:
cookie.pop(attr, None)
try:
self.driver.add_cookie(cookie)
except Exception as e:
print(f"添加cookie失败: {e}")
print("Cookies加载完成")
except Exception as e:
print(f"加载cookies失败: {e}")
def publish_video_with_product(self,
video_path: str,
title: str,
description: str = "",
product_name: str = None,
product_info: str = None,
schedule_time: str = None) -> bool:
"""发布带商品的视频
Args:
video_path: 视频文件路径
title: 视频标题
description: 视频描述
product_name: 商品名称
product_info: 商品信息
schedule_time: 定时发布时间,格式如 '2025-06-05 12:30'
"""
try:
print("开始发布流程...")
# 1. 上传视频
if not self.video_uploader.upload_video(video_path):
print("视频上传失败")
return False
# 2. 设置内容
if not self.content_editor.set_content(title, description):
print("内容设置失败")
return False
# 3. 挂载商品(如果提供)
if product_name and product_info:
if not self.product_linker.select_group_buy(product_name, product_info):
print("商品挂载失败")
return False
# 4. 设置定时发布(如果提供)
if schedule_time:
if not self.schedule_manager.set_schedule_time(schedule_time):
print("定时设置失败")
return False
# 5. 发布
return self._publish()
except Exception as e:
print(f"发布流程失败: {e}")
return False
def _publish(self) -> bool:
"""执行发布操作"""
try:
publish_locator = (By.XPATH, "//button[@class='button-dhlUZE primary-cECiOJ fixed-J9O8Yw']")
success = self.utils.safe_click(self.driver, publish_locator, use_js=True)
if success:
print("发布成功!")
return True
else:
print("点击发布按钮失败")
return False
except Exception as e:
print(f"发布失败: {e}")
return False
def wait_for_user_confirmation(self, message: str = "按回车键继续..."):
"""等待用户确认"""
input(message)
def close(self):
"""关闭浏览器"""
if self.driver:
self.driver.quit()
print("浏览器已关闭")
# 配置类
class DouyinConfig:
"""配置管理类"""
def __init__(self, config_file: str = None):
self.config = {}
if config_file:
self.load_config(config_file)
def load_config(self, config_file: str):
"""从文件加载配置"""
try:
with open(config_file, 'r', encoding='utf-8') as f:
self.config = json.load(f)
except Exception as e:
print(f"加载配置文件失败: {e}")
def get(self, key: str, default=None):
"""获取配置值"""
return self.config.get(key, default)
def set(self, key: str, value):
"""设置配置值"""
self.config[key] = value
def save_config(self, config_file: str):
"""保存配置到文件"""
try:
with open(config_file, 'w', encoding='utf-8') as f:
json.dump(self.config, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"保存配置文件失败: {e}")
if __name__ == "__main__":
# 使用示例
scheduler = DouyinScheduler("cookies.json", "safari")
try:
# 发布视频示例
success = scheduler.publish_video_with_product(
video_path="/Users/yarrow/autoPublisher/video/gdsc/2011_1749089195_raw.mp4",
title="广州周末亲子游",
description="带孩子探索科学的奥秘 #广州亲子游 #科学中心",
product_name="广东科学中心门票--亲子1大1小票",
product_info="广东科学中心门票",
schedule_time="2025-06-05 12:30"
)
if success:
print("发布流程完成")
else:
print("发布流程失败")
# 等待用户确认
scheduler.wait_for_user_confirmation("按回车键退出...")
finally:
scheduler.close()