tiktokAutoPublisher/douyin_publisher.py

421 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import selenium
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import json
import base64
import time
class DouyinPublisher():
def __init__(self, driver, cookies_path):
self.driver = driver
self.cookies_path = cookies_path
self.cookies = json.loads(open(self.cookies_path, "r").read())
# 先访问抖音网站
self.driver.get("https://creator.douyin.com/creator-micro/content/upload?enter_from=dou_web")
time.sleep(3) # 等待页面加载
# 然后加载cookies
self.load_cookies()
self.WEB_URL = "https://creator.douyin.com/creator-micro/content/upload?enter_from=dou_web"
self.driver.get(self.WEB_URL)
def load_cookies(self):
try:
for cookie in self.cookies:
# 移除可能导致问题的属性
for attr in ['sameSite', 'expiry']:
if attr in cookie:
del cookie[attr]
try:
self.driver.add_cookie(cookie)
except Exception as e:
print(f"添加cookie失败: {e}")
print("cookies加载完成")
# 刷新页面使cookies生效
self.driver.refresh()
time.sleep(3) # 等待刷新完成
except Exception as e:
print(f"添加cookies过程中发生错误: {e}")
return True
def open_web(self):
self.driver.get(self.WEB_URL)
# 等待页面加载完成
time.sleep(5)
return True
def select_upload_type(self, upload_type):
# 注意以下XPath选择器需要根据实际网页结构调整
if upload_type == "image":
# 图文
try:
self.driver.find_element(By.XPATH, "//div[contains(text(), '图文')]").click()
except:
print("无法找到图文上传按钮请检查XPath")
elif upload_type == "video":
# 视频
try:
self.driver.find_element(By.XPATH, "//div[contains(text(), '视频')]").click()
except:
print("无法找到视频上传按钮请检查XPath")
return True
def upload_image(self, image_path):
# 注意以下XPath选择器需要根据实际网页结构调整
try:
# 找到文件上传输入框
file_input = self.driver.find_element(By.XPATH, "//input[@type='file' and @accept='.jpg,.jpeg,.png']")
file_input.send_keys(image_path)
except Exception as e:
print(f"上传图片失败: {e}")
# 等待上传完成
time.sleep(5)
return True
def upload_video(self, video_path):
# 注意以下XPath选择器需要根据实际网页结构调整
try:
# 找到文件上传输入框
# 首先尝试点击上传视频按钮(如果需要)
try:
# 尝试通过文本内容定位上传视频按钮
upload_btn = self.driver.find_element(By.XPATH, "//span[contains(text(), '上传视频')]")
upload_btn.click()
time.sleep(1) # 等待文件选择框出现
except Exception as e:
print(f"点击上传视频按钮失败,尝试直接查找文件输入框: {e}")
# 然后找到文件输入框并上传文件
file_input = self.driver.find_element(By.XPATH, "//input[@type='file' and @accept='.mp4,.mov,.wmv,.avi,.m4v']")
file_input.send_keys(video_path)
except Exception as e:
print(f"上传视频失败: {e}")
# 等待上传完成
time.sleep(10) # 视频上传可能需要更长时间
return True
def publish(self, title, content):
# 注意以下XPath选择器需要根据实际网页结构调整
try:
# 输入标题
title_input = self.driver.find_element(By.XPATH, "//input[@placeholder='请输入标题']")
title_input.clear()
title_input.send_keys(title)
# 输入内容/描述
content_input = self.driver.find_element(By.XPATH, "//div[contains(@class, 'editor')]")
content_input.clear()
content_input.send_keys(content)
except Exception as e:
print(f"输入标题和内容失败: {e}")
return True
def click_publish(self):
# 注意以下XPath选择器需要根据实际网页结构调整
try:
# 找到并点击发布按钮
publish_button = self.driver.find_element(By.XPATH, "//button[contains(text(), '发布')]")
publish_button.click()
# 等待发布完成
time.sleep(5)
# 有时候可能会有确认弹窗
try:
confirm_button = self.driver.find_element(By.XPATH, "//button[contains(text(), '确定')]")
confirm_button.click()
except:
pass # 如果没有确认弹窗,则忽略
except Exception as e:
print(f"点击发布按钮失败: {e}")
return True
def publish_article(self, title, content, image_path):
self.select_upload_type("image")
self.upload_image(image_path)
self.publish(title, content)
self.click_publish()
return True
def publish_video(self, title, content, video_path):
self.select_upload_type("video")
self.upload_video(video_path)
self.publish(title, content)
self.click_publish()
return True
def work(self, type, title, content, image_path=None, video_path=None):
try:
self.open_web()
self.driver.maximize_window()
self.driver.implicitly_wait(10)
if type == "article" and image_path:
self.publish_article(title, content, image_path)
elif type == "video" and video_path:
self.publish_video(title, content, video_path)
else:
raise ValueError("Invalid type or missing file path")
print("发布完成!")
except Exception as e:
print(f"发布失败: {e}")
finally:
# 可以选择不立即退出,以便查看结果
# self.driver.quit()
pass
return True
# 新增方法:封装团购选择流程
def select_group_buy(self, product_name, product_info):
try:
# 下拉框选择
tab_selector = self.driver.find_element(By.XPATH, "//div[@class='semi-select select-lJTtRL semi-select-single']")
self.driver.execute_script("arguments[0].click();", tab_selector)
time.sleep(2)
# 选择团购
tab_selector_option = self.driver.find_element(By.XPATH, "//div[@data-code='13010' and @class='select-dropdown-option-video']")
self.driver.execute_script("arguments[0].click();", tab_selector_option)
time.sleep(2)
# 点击产品选择下拉框
product_selector = self.driver.find_element(By.XPATH, "//div[@class='semi-select select-Qm4u8S semi-select-single semi-select-filterable']")
self.driver.execute_script("arguments[0].click();", product_selector)
time.sleep(2)
# 输入产品名称
product_input = self.driver.find_element(By.XPATH, "//input[@class='semi-input semi-input-default' and @placeholder='']")
product_input.send_keys(product_name)
time.sleep(5)
# 选择产品
max_retries = 10
for attempt in range(max_retries):
clicked = self.driver.execute_script("""
let elements = document.querySelectorAll('.semi-select-option');
for (let el of elements) {
if (el.textContent.includes(arguments[0])) {
el.click();
return true;
}
}
return false;
""", product_name)
if clicked:
print(f"成功点击产品选项,尝试次数: {attempt+1}")
break
else:
print(f"等待产品选项出现,尝试次数: {attempt+1}/{max_retries}")
time.sleep(1)
time.sleep(2)
# 输入产品信息
product_info_input = self.driver.find_element(By.XPATH, "//input[@class='semi-input semi-input-default' and @placeholder='如:海底捞超值双人套餐']")
product_info_input.send_keys(product_info)
time.sleep(2)
# 点击确认按钮
confirm_button = self.driver.find_element(By.XPATH, "//div[@class='footer-button-DL8zDh']")
self.driver.execute_script("arguments[0].click();", confirm_button)
time.sleep(2)
return True
except Exception as e:
print(f"选择团购失败: {e}")
return False
# 新增方法:设置定时发布
def set_schedule_time(self, time_str):
try:
# 选择定时发布
schedule_label = self.driver.find_element(By.XPATH, "//label[contains(@class, 'radio-d4zkru')]//span[contains(text(), '定时发布')]/ancestor::label")
self.driver.execute_script("arguments[0].click();", schedule_label)
time.sleep(2)
# 输入时间
time_inputor = self.driver.find_element(By.XPATH, "//input[@class='semi-input semi-input-default' and @type='text' and @placeholder='日期和时间']")
time_inputor.send_keys(time_str)
time.sleep(2)
return True
except Exception as e:
print(f"设置定时发布失败: {e}")
return False
# 新增方法:最后确认接口
def final_confirmation(self, prompt="按任意键继续..."):
input(prompt)
return True
if __name__ == "__main__":
# 使用Safari浏览器
driver = webdriver.Safari()
# 也可以根据需要选择其他浏览器
# driver = webdriver.Chrome()
# driver = webdriver.Firefox()
# 设置浏览器全屏
driver.maximize_window()
# 设置页面加载超时时间
driver.set_page_load_timeout(30)
# cookies路径需要先获取登录后的cookies
cookies_path = "cookies.json"
# 创建抖音发布器实例
publisher = DouyinPublisher(driver, cookies_path)
time.sleep(10)
# <button class="semi-button semi-button-primary container-drag-btn-k6XmB4 semi-button-with-icon" type="button"><span class="semi-button-content"><svg xmlns="http://www.w3.org/2000/svg" width="20" height="20" fill="none" viewBox="0 0 20 20"><rect width="16.333" height="13" x="1.833" y="3.5" stroke="#fff" stroke-width="2" rx="2"></rect><path stroke="#fff" stroke-linecap="round" stroke-width="2" d="M10 7.5v5M12.5 10h-5"></path></svg><span class="semi-button-content-right">上传视频</span></span></button>
# 上传视频按钮
video_path = "/Users/yarrow/autoPublisher/8970_1747905081.mp4"
file_input = driver.find_element(By.XPATH, "//input[@style='display: none;']")
file_input.send_keys(video_path)
time.sleep(10)
print("视频上传完成")
# input("按任意键继续...")
title_input = driver.find_element(By.XPATH, "//input[@placeholder='填写作品标题,为作品获得更多流量']")
title_input.send_keys("测试标题")
content_input = driver.find_element(By.XPATH, "//div[@data-placeholder='添加作品简介']")
content_input.send_keys("测试内容")
print("内容输入完成")
# input("按任意键继续...")
# 下拉框选择
tab_selector = driver.find_element(By.XPATH, "//div[@class='semi-select select-lJTtRL semi-select-single']")
# 方法1使用JavaScript执行点击
driver.execute_script("arguments[0].click();", tab_selector)
# 如果上面方法不生效,可以尝试以下备选方法:
# 方法2使用Actions类进行点击
# from selenium.webdriver.common.action_chains import ActionChains
# ActionChains(driver).move_to_element(tab_selector).click().perform()
# 方法3使用显式等待确保元素可点击
# WebDriverWait(driver, 10).until(EC.element_to_be_clickable((By.XPATH, "//div[@class='semi-select select-lJTtRL semi-select-single']"))).click()
time.sleep(2)
print("下拉框选择完成")
# input("按任意键继续...")
tab_selector_option = driver.find_element(By.XPATH, "//div[@class='semi-select-option-list semi-select-option-list-chosen']")
## 选择团购
tab_selector_option = driver.find_element(By.XPATH, "//div[@data-code='13010' and @class='select-dropdown-option-video']")
driver.execute_script("arguments[0].click();", tab_selector_option)
time.sleep(2)
print("团购选择完成")
# input("按任意键继续...")
## 点击新的下拉框
product_selector = driver.find_element(By.XPATH, "//div[@class='semi-select select-Qm4u8S semi-select-single semi-select-filterable']")
driver.execute_script("arguments[0].click();", product_selector)
time.sleep(2)
print("新的下拉框选择完成")
# input("按任意键继续...")
product_input = driver.find_element(By.XPATH, "//input[@class='semi-input semi-input-default' and @placeholder='']")
product_input.send_keys("古龙峡漂流|国际(勇猛)赛道全程漂流票 成人票")
print("产品输入完成")
# input("按任意键继续...")
time.sleep(5)
## 下拉框选择这里有问题 找不到下拉框
# 添加等待和重试机制
max_retries = 10
for attempt in range(max_retries):
# 使用JavaScript直接查找并点击元素与控制台操作相同
clicked = driver.execute_script("""
// 尝试精确匹配类名
let elements = document.getElementsByClassName('semi-select-option semi-select-option-focused option-v2-U5PXMk');
if (elements.length > 0) {
elements[0].click();
return true;
}
// 备选方案1使用更宽泛的类名查找
elements = document.querySelectorAll('.semi-select-option');
for (let el of elements) {
if (el.textContent.includes('古龙峡漂流')) {
el.click();
return true;
}
}
// 备选方案2使用aria-role查找
elements = document.querySelectorAll('[role="option"]');
for (let el of elements) {
if (el.textContent.includes('古龙峡漂流')) {
el.click();
return true;
}
}
return false;
""")
if clicked:
print(f"成功点击产品选项,尝试次数: {attempt+1}")
break
else:
print(f"等待产品选项出现,尝试次数: {attempt+1}/{max_retries}")
time.sleep(1) # 等待1秒后重试
# 等待一会,让点击生效
time.sleep(2)
print("产品选择完成")
# input("按任意键继续...")
product_info_input = driver.find_element(By.XPATH, "//input[@class='semi-input semi-input-default' and @placeholder='如:海底捞超值双人套餐']")
product_info_input.send_keys("古龙峡漂流成人票")
print("产品信息输入完成")
# input("按任意键继续...")
confirm_button = driver.find_element(By.XPATH, "//div[@class='footer-button-DL8zDh']")
driver.execute_script("arguments[0].click();", confirm_button)
time.sleep(2)
print("确认完成")
# input("按任意键继续...")
# 例如选择"定时发布"选项
schedule_label = driver.find_element(By.XPATH, "//label[contains(@class, 'radio-d4zkru')]//span[contains(text(), '定时发布')]/ancestor::label")
driver.execute_script("arguments[0].click();", schedule_label)
# driver.execute_script("arguments[0].click();", time_selector_label)
time.sleep(2)
print("定时发布切换完成")
time_inputor = driver.find_element(By.XPATH, "//input[@class='semi-input semi-input-default' and @type='text' and @placeholder='日期和时间']")
time_inputor.send_keys("2025-05-27 10:00")
time.sleep(10)
print("定时发布选择完成")
input("按任意键继续...")
# 发布图文
# publisher.work(type="article", title="测试标题", content="测试内容 #测试话题", image_path="/path/to/your/image.jpg")
# 发布视频
# publisher.work(type="video", title="测试视频", content="测试视频内容 #测试话题", video_path="/Users/mac/Desktop/test.mp4")
# 发布后保持浏览器窗口开启,方便检查结果
input("按任意键退出...")
driver.quit()