diff --git a/douyin_scheduler.py b/douyin_scheduler.py index ac37e8e..1257cda 100644 --- a/douyin_scheduler.py +++ b/douyin_scheduler.py @@ -5,6 +5,7 @@ import json import time +import random from datetime import datetime, timedelta from typing import Dict, List, Optional, Any from selenium import webdriver @@ -13,6 +14,8 @@ from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.common.action_chains import ActionChains from selenium.common.exceptions import TimeoutException, NoSuchElementException +import os +import base64 class DouyinUtils: @@ -110,18 +113,18 @@ class VideoUploader: # 可以通过检查进度条、上传状态等来判断是否完成 # 这里使用简单的时间等待,实际可以更智能 time.sleep(10) # 基础等待时间 - + ## TODO: 这个要重新做一下 # 尝试检查是否有上传进度指示器 - for i in range(timeout // 5): - try: - # 检查是否有上传完成的标志 - progress_element = self.driver.find_element(By.XPATH, "//div[contains(@class, 'progress') or contains(@class, 'upload')]") - if "100%" in progress_element.text or "完成" in progress_element.text: - return True - except NoSuchElementException: - pass + # for i in range(timeout // 5): + # try: + # # 检查是否有上传完成的标志 + # progress_element = self.driver.find_element(By.XPATH, "//div[contains(@class, 'progress') or contains(@class, 'upload')]") + # if "100%" in progress_element.text or "完成" in progress_element.text: + # return True + # except NoSuchElementException: + # pass - time.sleep(5) + # time.sleep(5) return True # 如果没有找到进度指示器,假设上传完成 @@ -189,6 +192,7 @@ class ProductLinker: return False print("团购商品选择完成") + time.sleep(random.random()*1.5) return True except Exception as e: @@ -261,6 +265,238 @@ class ProductLinker: confirm_locator = (By.XPATH, "//div[@class='footer-button-DL8zDh']") return self.utils.safe_click(self.driver, confirm_locator, use_js=True) +class PostManager: + """海报设定模块""" + + def __init__(self, driver): + self.driver = driver + self.utils = DouyinUtils() + + def _click_poster_setting(self) -> bool: + """点击海报设定""" + try: + # 尝试多个可能的海报设置按钮选择器 + poster_selectors = [ + (By.XPATH, "//div[@class='background-OpVteV filter-k_CjvJ']"), + # (By.XPATH, "//div[contains(@class, 'background') and contains(@class, 'filter')]"), + # (By.XPATH, "//div[contains(text(), '设置封面') or contains(text(), '海报')]"), + # (By.CSS_SELECTOR, ".background-OpVteV.filter-k_CjvJ") + ] + + for selector in poster_selectors: + if self.utils.safe_click(self.driver, selector, use_js=True): + print("海报设置按钮点击成功") + time.sleep(1) # 等待面板打开 + return True + + print("未找到海报设置按钮") + return False + + except Exception as e: + print(f"点击海报设置按钮失败: {e}") + return False + + def _upload_poster_file(self, poster_path: str) -> bool: + """上传海报文件""" + try: + print(f"开始上传海报文件: {poster_path}") + + # 检查文件是否存在 + if not os.path.exists(poster_path): + print(f"海报文件不存在: {poster_path}") + return False + + # 方法1: 使用send_keys方式(首选) + if self._upload_via_send_keys(poster_path): + print("使用send_keys方式上传成功") + return True + + # 方法2: 使用base64编码方式(备选) + print("send_keys方式失败,尝试base64方式...") + if self._upload_via_base64(poster_path): + print("使用base64方式上传成功") + return True + + print("所有上传方式都失败了") + return False + + except Exception as e: + print(f"上传海报文件失败: {e}") + return False + + def _upload_via_send_keys(self, poster_path: str) -> bool: + """使用send_keys方式上传文件""" + try: + # 查找隐藏的文件上传输入框,模仿视频上传的方式 + file_input_selectors = [ + # 海报上传的隐藏input元素(主要选择器) + (By.XPATH, "//input[@class='semi-upload-hidden-input' and @type='file']"), + (By.XPATH, "//input[contains(@class, 'semi-upload-hidden-input')]"), + # 通用的隐藏文件输入框 + (By.XPATH, "//input[@type='file' and contains(@accept, 'image')]"), + (By.XPATH, "//input[@type='file' and @style='display: none;']"), + # 备选方案 + (By.XPATH, "//div[contains(@class, 'semi-upload')]//input[@type='file']"), + (By.CSS_SELECTOR, "input[type='file'].semi-upload-hidden-input") + ] + + file_input = None + for selector in file_input_selectors: + file_input = self.utils.wait_for_element(self.driver, selector, timeout=5) + if file_input: + print(f"找到文件输入框: {selector}") + break + + if file_input: + # 直接发送文件路径(模仿视频上传方式) + file_input.send_keys(poster_path) + print("海报文件已通过send_keys发送到输入框") + + # 等待上传完成 + time.sleep(3) + return True + else: + print("未找到文件上传输入框") + return False + + except Exception as e: + print(f"send_keys上传失败: {e}") + return False + + def _upload_via_base64(self, poster_path: str) -> bool: + """使用base64编码方式上传文件""" + try: + # 读取文件并转换为base64 + with open(poster_path, 'rb') as f: + file_content = f.read() + base64_content = base64.b64encode(file_content).decode('utf-8') + + # 获取文件类型 + file_extension = poster_path.lower().split('.')[-1] + mime_types = { + 'jpg': 'image/jpeg', + 'jpeg': 'image/jpeg', + 'png': 'image/png', + 'gif': 'image/gif', + 'webp': 'image/webp' + } + mime_type = mime_types.get(file_extension, 'image/jpeg') + + # 构造data URL + data_url = f"data:{mime_type};base64,{base64_content}" + + # 尝试通过JavaScript上传 + upload_success = self.driver.execute_script(""" + // 查找上传区域 + const uploadArea = document.querySelector('.semi-upload-drag-area') || + document.querySelector('[class*="upload"]') || + document.querySelector('input[type="file"]'); + + if (!uploadArea) { + return false; + } + + // 创建文件对象 + function dataURLtoFile(dataurl, filename) { + const arr = dataurl.split(','); + const mime = arr[0].match(/:(.*?);/)[1]; + const bstr = atob(arr[1]); + let n = bstr.length; + const u8arr = new Uint8Array(n); + while(n--) { + u8arr[n] = bstr.charCodeAt(n); + } + return new File([u8arr], filename, {type: mime}); + } + + // 转换为File对象 + const file = dataURLtoFile(arguments[0], arguments[1]); + + // 创建FileList + const dt = new DataTransfer(); + dt.items.add(file); + + // 查找input[type="file"]元素 + const fileInput = document.querySelector('input[type="file"]'); + if (fileInput) { + fileInput.files = dt.files; + + // 触发change事件 + const event = new Event('change', { bubbles: true }); + fileInput.dispatchEvent(event); + + return true; + } + + return false; + """, data_url, os.path.basename(poster_path)) + + if upload_success: + print("base64上传触发成功") + time.sleep(3) # 等待上传处理 + return True + else: + print("base64上传触发失败") + return False + + except Exception as e: + print(f"base64上传失败: {e}") + return False + + def _confirm_poster_setting(self) -> bool: + """确认海报设定""" + try: + # 等待一段时间让上传完成 + time.sleep(2) + + # 尝试多个可能的确认按钮选择器 + confirm_selectors = [ + (By.XPATH, "//button[@class='semi-button semi-button-primary semi-button-light primary-RstHX_']"), + (By.XPATH, "//button[contains(@class, 'semi-button-primary') and (contains(text(), '确定') or contains(text(), '确认') or contains(text(), '保存'))]"), + (By.XPATH, "//button[contains(text(), '确定') or contains(text(), '确认') or contains(text(), '保存')]"), + (By.CSS_SELECTOR, ".semi-button-primary") + ] + + for selector in confirm_selectors: + if self.utils.safe_click(self.driver, selector, use_js=True): + print("海报设置确认成功") + time.sleep(1) + return True + + print("未找到确认按钮,可能已自动保存") + return True # 有些情况下可能自动保存,不需要确认按钮 + + except Exception as e: + print(f"确认海报设定失败: {e}") + return False + + def set_poster(self, poster_path: str) -> bool: + """设置海报的完整流程""" + try: + print(f"开始设置海报: {poster_path}") + + # 1. 点击海报设置按钮 + if not self._click_poster_setting(): + print("点击海报设置按钮失败") + return False + + # 2. 上传海报文件 + if not self._upload_poster_file(poster_path): + print("上传海报文件失败") + return False + + # 3. 确认设置 + if not self._confirm_poster_setting(): + print("确认海报设置失败") + return False + + print("海报设置完成") + time.sleep(random.random() * 1.5) + return True + + except Exception as e: + print(f"设置海报失败: {e}") + return False class ScheduleManager: """定时发布管理模块""" @@ -280,13 +516,29 @@ class ScheduleManager: # 1. 选择定时发布选项 if not self._select_schedule_option(): + print("选择定时发布选项失败") return False - # 2. 输入时间 - if not self._input_schedule_time(schedule_time): + # 2. 等待定时发布界面加载完成 + if not self._wait_for_schedule_interface(): + print("等待定时发布界面超时") return False + # 3. 输入时间 + if not self._input_schedule_time(schedule_time): + print("输入定时时间失败") + return False + + # 4. 验证时间是否设置正确 + if not self._verify_schedule_time(schedule_time): + print("时间验证失败,尝试重新设置") + # 重试一次 + if not self._input_schedule_time(schedule_time): + print("重试输入时间失败") + return False + print("定时发布设置完成") + time.sleep(random.random() * 1.5) return True except Exception as e: @@ -295,38 +547,157 @@ class ScheduleManager: def _select_schedule_option(self) -> bool: """选择定时发布选项""" - schedule_locator = (By.XPATH, "//label[contains(@class, 'radio-d4zkru')]//span[contains(text(), '定时发布')]/ancestor::label") - return self.utils.safe_click(self.driver, schedule_locator, use_js=True) + try: + # 尝试多个可能的定时发布选择器 + schedule_selectors = [ + (By.XPATH, "//label[contains(@class, 'radio-d4zkru')]//span[contains(text(), '定时发布')]/ancestor::label"), + (By.XPATH, "//label[contains(@class, 'radio')]//span[contains(text(), '定时发布')]/ancestor::label"), + (By.XPATH, "//span[contains(text(), '定时发布')]/ancestor::label"), + (By.XPATH, "//input[@type='radio' and @value='schedule']/../label") + ] + + for selector in schedule_selectors: + if self.utils.safe_click(self.driver, selector, use_js=True): + print("成功选择定时发布选项") + time.sleep(2) # 增加等待时间,让界面完全加载 + return True + + print("未找到定时发布选项") + return False + + except Exception as e: + print(f"选择定时发布选项失败: {e}") + return False + + def _wait_for_schedule_interface(self) -> bool: + """等待定时发布界面完全加载""" + try: + print("等待定时发布界面加载...") + + # 等待时间输入框出现 + time_input_selectors = [ + (By.XPATH, "//input[@class='semi-input semi-input-default' and @type='text' and @placeholder='日期和时间']"), + (By.XPATH, "//input[contains(@placeholder, '日期和时间')]"), + (By.XPATH, "//input[contains(@placeholder, '选择日期')]"), + (By.CSS_SELECTOR, "input[placeholder*='日期']") + ] + + for selector in time_input_selectors: + element = self.utils.wait_for_element(self.driver, selector, timeout=10) + if element: + print("定时发布界面加载完成") + time.sleep(1) # 额外等待确保界面稳定 + return True + + print("定时发布界面加载超时") + return False + + except Exception as e: + print(f"等待定时发布界面失败: {e}") + return False def _input_schedule_time(self, time_str: str) -> bool: """输入定时发布时间""" try: - time_input = self.utils.wait_for_element( - self.driver, - (By.XPATH, "//input[@class='semi-input semi-input-default' and @type='text' and @placeholder='日期和时间']") - ) + print(f"开始输入时间: {time_str}") - if time_input: - # 使用JavaScript设置时间值,确保触发相关事件 + # 查找时间输入框 + time_input_selectors = [ + (By.XPATH, "//input[@class='semi-input semi-input-default' and @type='text' and @placeholder='日期和时间']"), + (By.XPATH, "//input[contains(@placeholder, '日期和时间')]"), + (By.XPATH, "//input[contains(@placeholder, '选择日期')]") + ] + + time_input = None + for selector in time_input_selectors: + time_input = self.utils.wait_for_element(self.driver, selector, timeout=5) + if time_input: + print(f"找到时间输入框: {selector}") + break + + if not time_input: + print("未找到时间输入框") + return False + + # 清空输入框 + time_input.clear() + time.sleep(0.5) + + # 方法1: 直接输入 + try: + time_input.send_keys(time_str) + time.sleep(1) + print("使用send_keys方法输入时间") + except Exception as e: + print(f"send_keys输入失败: {e}") + + # 方法2: 使用JavaScript设置值(作为备选) + try: self.driver.execute_script(""" const input = arguments[0]; const dateValue = arguments[1]; + // 清空并设置值 + input.value = ''; + input.focus(); input.value = dateValue; - // 触发事件确保组件更新 - const inputEvent = new Event('input', { bubbles: true }); - const changeEvent = new Event('change', { bubbles: true }); - input.dispatchEvent(inputEvent); - input.dispatchEvent(changeEvent); + // 触发多种事件确保组件更新 + const events = ['input', 'change', 'blur', 'keyup']; + events.forEach(eventType => { + const event = new Event(eventType, { bubbles: true }); + input.dispatchEvent(event); + }); + + // 模拟失去焦点 + input.blur(); """, time_input, time_str) - return True - return False + time.sleep(1) + print("使用JavaScript方法设置时间") + + except Exception as e: + print(f"JavaScript设置时间失败: {e}") + + # 等待界面处理 + time.sleep(2) + return True except Exception as e: print(f"输入定时时间失败: {e}") return False + + def _verify_schedule_time(self, expected_time: str) -> bool: + """验证设置的时间是否正确""" + try: + print(f"验证设置的时间: {expected_time}") + + # 查找时间输入框并检查其值 + time_input = self.utils.wait_for_element( + self.driver, + (By.XPATH, "//input[@class='semi-input semi-input-default' and @type='text' and @placeholder='日期和时间']"), + timeout=5 + ) + + if time_input: + actual_value = time_input.get_attribute('value') + print(f"输入框实际值: {actual_value}") + print(f"期望值: {expected_time}") + + # 简单的值比较(可能需要根据实际格式调整) + if actual_value and (expected_time in actual_value or actual_value in expected_time): + print("时间验证成功") + return True + else: + print(f"时间验证失败: 期望 {expected_time}, 实际 {actual_value}") + return False + else: + print("无法找到时间输入框进行验证") + return False + + except Exception as e: + print(f"验证时间失败: {e}") + return False class DouyinScheduler: @@ -341,6 +712,7 @@ class DouyinScheduler: self.content_editor = ContentEditor(self.driver) self.product_linker = ProductLinker(self.driver) self.schedule_manager = ScheduleManager(self.driver) + self.poster_manager = PostManager(self.driver) self.utils = DouyinUtils() # 初始化浏览器和登录 @@ -373,7 +745,8 @@ class DouyinScheduler: # 刷新页面 self.driver.refresh() self.utils.smart_wait(self.driver, 5) - + self.driver.get("https://creator.douyin.com/creator-micro/content/upload?enter_from=dou_web") + self.utils.smart_wait(self.driver, 5) def _load_cookies(self): """加载cookies""" try: @@ -399,6 +772,7 @@ class DouyinScheduler: video_path: str, title: str, description: str = "", + poster_path: str = None, product_name: str = None, product_info: str = None, schedule_time: str = None) -> bool: @@ -431,13 +805,19 @@ class DouyinScheduler: print("商品挂载失败") return False - # 4. 设置定时发布(如果提供) + # 4. 设置海报(如果提供) + if poster_path: + if not self.poster_manager.set_poster(poster_path): + print("海报设置失败") + return False + + # 5. 设置定时发布(如果提供) if schedule_time: if not self.schedule_manager.set_schedule_time(schedule_time): print("定时设置失败") return False - # 5. 发布 + # 6. 发布 return self._publish() except Exception as e: @@ -448,6 +828,13 @@ class DouyinScheduler: """执行发布操作""" try: publish_locator = (By.XPATH, "//button[@class='button-dhlUZE primary-cECiOJ fixed-J9O8Yw']") + publish_button = self.utils.wait_for_element(self.driver, publish_locator) + if publish_button: + print("发布按钮已找到") + else: + print("发布按钮未找到") + return False + success = self.utils.safe_click(self.driver, publish_locator, use_js=True) if success: @@ -508,26 +895,37 @@ class DouyinConfig: if __name__ == "__main__": # 使用示例 - scheduler = DouyinScheduler("cookies.json", "safari") + scheduler = DouyinScheduler("cookies/cookies.json", "safari") try: - # 发布视频示例 - success = scheduler.publish_video_with_product( - video_path="/Users/yarrow/autoPublisher/video/gdsc/2011_1749089195_raw.mp4", - title="广州周末亲子游", - description="带孩子探索科学的奥秘 #广州亲子游 #科学中心", - product_name="广东科学中心门票--亲子1大1小票", - product_info="广东科学中心门票", - schedule_time="2025-06-05 12:30" - ) + video_path = "/Users/yarrow/autoPublisher/video/gdsc/12575_1749192602.mp4" + title = "暑期遛娃宝藏地!科学中心解锁知识亲子趣时光~" + description = """ + 暑假想让娃远离电子产品? + 不想让娃在家 "葛优躺"? + 来广东科学中心,亲子开启「边玩边学」模式! - if success: - print("发布流程完成") - else: - print("发布流程失败") - + 👉 点击左下角购票,给娃一个 "科技含金量" 满满的暑假! + 📍 地址:广州大学城科普路 168 号 + 🚇 地铁 4 号线大学城北站转 801 路直达 + #广东科学中心 #暑期遛娃 #科技启蒙 #暑假去哪玩 #亲子时光 + """ + product_name = "广东科学中心门票--亲子1大1小票" + product_info = "广东科学中心门票" + schedule_time = "2025-06-10 19:10" + poster_path = "/Users/yarrow/autoPublisher/video/gdsc/poster/1.png" + success = scheduler.publish_video_with_product( + video_path=video_path, + title=title, + description=description, + poster_path=poster_path, + product_name=product_name, + product_info=product_info, + schedule_time=schedule_time + ) # 等待用户确认 scheduler.wait_for_user_confirmation("按回车键退出...") - + except Exception as e: + print(f"发布失败: {e}") finally: scheduler.close() \ No newline at end of file