266 lines
11 KiB
Python
266 lines
11 KiB
Python
|
|
# -*- coding: utf-8 -*-
|
|||
|
|
import re
|
|||
|
|
from datetime import datetime
|
|||
|
|
|
|||
|
|
from playwright.async_api import Playwright, async_playwright
|
|||
|
|
import os
|
|||
|
|
import asyncio
|
|||
|
|
from uploader.tk_uploader.tk_config import Tk_Locator
|
|||
|
|
from utils.base_social_media import set_init_script
|
|||
|
|
from utils.files_times import get_absolute_path
|
|||
|
|
from utils.log import tiktok_logger
|
|||
|
|
|
|||
|
|
|
|||
|
|
async def cookie_auth(account_file):
|
|||
|
|
async with async_playwright() as playwright:
|
|||
|
|
browser = await playwright.firefox.launch(headless=True)
|
|||
|
|
context = await browser.new_context(storage_state=account_file)
|
|||
|
|
context = await set_init_script(context)
|
|||
|
|
# 创建一个新的页面
|
|||
|
|
page = await context.new_page()
|
|||
|
|
# 访问指定的 URL
|
|||
|
|
await page.goto("https://www.tiktok.com/tiktokstudio/upload?lang=en")
|
|||
|
|
await page.wait_for_load_state('networkidle')
|
|||
|
|
try:
|
|||
|
|
# 选择所有的 select 元素
|
|||
|
|
select_elements = await page.query_selector_all('select')
|
|||
|
|
for element in select_elements:
|
|||
|
|
class_name = await element.get_attribute('class')
|
|||
|
|
# 使用正则表达式匹配特定模式的 class 名称
|
|||
|
|
if re.match(r'tiktok-.*-SelectFormContainer.*', class_name):
|
|||
|
|
tiktok_logger.error("[+] cookie expired")
|
|||
|
|
return False
|
|||
|
|
tiktok_logger.success("[+] cookie valid")
|
|||
|
|
return True
|
|||
|
|
except:
|
|||
|
|
tiktok_logger.success("[+] cookie valid")
|
|||
|
|
return True
|
|||
|
|
|
|||
|
|
|
|||
|
|
async def tiktok_setup(account_file, handle=False):
|
|||
|
|
account_file = get_absolute_path(account_file, "tk_uploader")
|
|||
|
|
if not os.path.exists(account_file) or not await cookie_auth(account_file):
|
|||
|
|
if not handle:
|
|||
|
|
return False
|
|||
|
|
tiktok_logger.info('[+] cookie file is not existed or expired. Now open the browser auto. Please login with your way(gmail phone, whatever, the cookie file will generated after login')
|
|||
|
|
await get_tiktok_cookie(account_file)
|
|||
|
|
return True
|
|||
|
|
|
|||
|
|
|
|||
|
|
async def get_tiktok_cookie(account_file):
|
|||
|
|
async with async_playwright() as playwright:
|
|||
|
|
options = {
|
|||
|
|
'args': [
|
|||
|
|
'--lang en-GB',
|
|||
|
|
],
|
|||
|
|
'headless': False, # Set headless option here
|
|||
|
|
}
|
|||
|
|
# Make sure to run headed.
|
|||
|
|
browser = await playwright.firefox.launch(**options)
|
|||
|
|
# Setup context however you like.
|
|||
|
|
context = await browser.new_context() # Pass any options
|
|||
|
|
context = await set_init_script(context)
|
|||
|
|
# Pause the page, and start recording manually.
|
|||
|
|
page = await context.new_page()
|
|||
|
|
await page.goto("https://www.tiktok.com/login?lang=en")
|
|||
|
|
await page.pause()
|
|||
|
|
# 点击调试器的继续,保存cookie
|
|||
|
|
await context.storage_state(path=account_file)
|
|||
|
|
|
|||
|
|
|
|||
|
|
class TiktokVideo(object):
|
|||
|
|
def __init__(self, title, file_path, tags, publish_date, account_file):
|
|||
|
|
self.title = title
|
|||
|
|
self.file_path = file_path
|
|||
|
|
self.tags = tags
|
|||
|
|
self.publish_date = publish_date
|
|||
|
|
self.account_file = account_file
|
|||
|
|
self.locator_base = None
|
|||
|
|
|
|||
|
|
|
|||
|
|
async def set_schedule_time(self, page, publish_date):
|
|||
|
|
schedule_input_element = self.locator_base.get_by_label('Schedule')
|
|||
|
|
await schedule_input_element.wait_for(state='visible') # 确保按钮可见
|
|||
|
|
|
|||
|
|
await schedule_input_element.click()
|
|||
|
|
scheduled_picker = self.locator_base.locator('div.scheduled-picker')
|
|||
|
|
await scheduled_picker.locator('div.TUXInputBox').nth(1).click()
|
|||
|
|
|
|||
|
|
calendar_month = await self.locator_base.locator('div.calendar-wrapper span.month-title').inner_text()
|
|||
|
|
|
|||
|
|
n_calendar_month = datetime.strptime(calendar_month, '%B').month
|
|||
|
|
|
|||
|
|
schedule_month = publish_date.month
|
|||
|
|
|
|||
|
|
if n_calendar_month != schedule_month:
|
|||
|
|
if n_calendar_month < schedule_month:
|
|||
|
|
arrow = self.locator_base.locator('div.calendar-wrapper span.arrow').nth(-1)
|
|||
|
|
else:
|
|||
|
|
arrow = self.locator_base.locator('div.calendar-wrapper span.arrow').nth(0)
|
|||
|
|
await arrow.click()
|
|||
|
|
|
|||
|
|
# day set
|
|||
|
|
valid_days_locator = self.locator_base.locator(
|
|||
|
|
'div.calendar-wrapper span.day.valid')
|
|||
|
|
valid_days = await valid_days_locator.count()
|
|||
|
|
for i in range(valid_days):
|
|||
|
|
day_element = valid_days_locator.nth(i)
|
|||
|
|
text = await day_element.inner_text()
|
|||
|
|
if text.strip() == str(publish_date.day):
|
|||
|
|
await day_element.click()
|
|||
|
|
break
|
|||
|
|
# time set
|
|||
|
|
await scheduled_picker.locator('div.TUXInputBox').nth(0).click()
|
|||
|
|
|
|||
|
|
hour_str = publish_date.strftime("%H")
|
|||
|
|
correct_minute = int(publish_date.minute / 5)
|
|||
|
|
minute_str = f"{correct_minute:02d}"
|
|||
|
|
|
|||
|
|
hour_selector = f"span.tiktok-timepicker-left:has-text('{hour_str}')"
|
|||
|
|
minute_selector = f"span.tiktok-timepicker-right:has-text('{minute_str}')"
|
|||
|
|
|
|||
|
|
# pick hour first
|
|||
|
|
await self.locator_base.locator(hour_selector).click()
|
|||
|
|
# click time button again
|
|||
|
|
# 等待某个特定的元素出现或状态变化,表明UI已更新
|
|||
|
|
await page.wait_for_timeout(1000) # 等待500毫秒
|
|||
|
|
await scheduled_picker.locator('div.TUXInputBox').nth(0).click()
|
|||
|
|
# pick minutes after
|
|||
|
|
await self.locator_base.locator(minute_selector).click()
|
|||
|
|
|
|||
|
|
# click title to remove the focus.
|
|||
|
|
await self.locator_base.locator("h1:has-text('Upload video')").click()
|
|||
|
|
|
|||
|
|
async def handle_upload_error(self, page):
|
|||
|
|
tiktok_logger.info("video upload error retrying.")
|
|||
|
|
select_file_button = self.locator_base.locator('button[aria-label="Select file"]')
|
|||
|
|
async with page.expect_file_chooser() as fc_info:
|
|||
|
|
await select_file_button.click()
|
|||
|
|
file_chooser = await fc_info.value
|
|||
|
|
await file_chooser.set_files(self.file_path)
|
|||
|
|
|
|||
|
|
async def upload(self, playwright: Playwright) -> None:
|
|||
|
|
browser = await playwright.firefox.launch(headless=False)
|
|||
|
|
context = await browser.new_context(storage_state=f"{self.account_file}")
|
|||
|
|
context = await set_init_script(context)
|
|||
|
|
page = await context.new_page()
|
|||
|
|
|
|||
|
|
await page.goto("https://www.tiktok.com/creator-center/upload")
|
|||
|
|
tiktok_logger.info(f'[+]Uploading-------{self.title}.mp4')
|
|||
|
|
|
|||
|
|
await page.wait_for_url("https://www.tiktok.com/tiktokstudio/upload", timeout=10000)
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
await page.wait_for_selector('iframe[data-tt="Upload_index_iframe"], div.upload-container', timeout=10000)
|
|||
|
|
tiktok_logger.info("Either iframe or div appeared.")
|
|||
|
|
except Exception as e:
|
|||
|
|
tiktok_logger.error("Neither iframe nor div appeared within the timeout.")
|
|||
|
|
|
|||
|
|
await self.choose_base_locator(page)
|
|||
|
|
|
|||
|
|
upload_button = self.locator_base.locator(
|
|||
|
|
'button:has-text("Select video"):visible')
|
|||
|
|
await upload_button.wait_for(state='visible') # 确保按钮可见
|
|||
|
|
|
|||
|
|
async with page.expect_file_chooser() as fc_info:
|
|||
|
|
await upload_button.click()
|
|||
|
|
file_chooser = await fc_info.value
|
|||
|
|
await file_chooser.set_files(self.file_path)
|
|||
|
|
|
|||
|
|
await self.add_title_tags(page)
|
|||
|
|
# detact upload status
|
|||
|
|
await self.detect_upload_status(page)
|
|||
|
|
if self.publish_date != 0:
|
|||
|
|
await self.set_schedule_time(page, self.publish_date)
|
|||
|
|
|
|||
|
|
await self.click_publish(page)
|
|||
|
|
|
|||
|
|
await context.storage_state(path=f"{self.account_file}") # save cookie
|
|||
|
|
tiktok_logger.info(' [-] update cookie!')
|
|||
|
|
await asyncio.sleep(2) # close delay for look the video status
|
|||
|
|
# close all
|
|||
|
|
await context.close()
|
|||
|
|
await browser.close()
|
|||
|
|
|
|||
|
|
async def add_title_tags(self, page):
|
|||
|
|
|
|||
|
|
editor_locator = self.locator_base.locator('div.public-DraftEditor-content')
|
|||
|
|
await editor_locator.click()
|
|||
|
|
|
|||
|
|
await page.keyboard.press("End")
|
|||
|
|
|
|||
|
|
await page.keyboard.press("Control+A")
|
|||
|
|
|
|||
|
|
await page.keyboard.press("Delete")
|
|||
|
|
|
|||
|
|
await page.keyboard.press("End")
|
|||
|
|
|
|||
|
|
await page.wait_for_timeout(1000) # 等待1秒
|
|||
|
|
|
|||
|
|
await page.keyboard.insert_text(self.title)
|
|||
|
|
await page.wait_for_timeout(1000) # 等待1秒
|
|||
|
|
await page.keyboard.press("End")
|
|||
|
|
|
|||
|
|
await page.keyboard.press("Enter")
|
|||
|
|
|
|||
|
|
# tag part
|
|||
|
|
for index, tag in enumerate(self.tags, start=1):
|
|||
|
|
tiktok_logger.info("Setting the %s tag" % index)
|
|||
|
|
await page.keyboard.press("End")
|
|||
|
|
await page.wait_for_timeout(1000) # 等待1秒
|
|||
|
|
await page.keyboard.insert_text("#" + tag + " ")
|
|||
|
|
await page.keyboard.press("Space")
|
|||
|
|
await page.wait_for_timeout(1000) # 等待1秒
|
|||
|
|
|
|||
|
|
await page.keyboard.press("Backspace")
|
|||
|
|
await page.keyboard.press("End")
|
|||
|
|
|
|||
|
|
async def click_publish(self, page):
|
|||
|
|
success_flag_div = '#\\:r9\\:'
|
|||
|
|
while True:
|
|||
|
|
try:
|
|||
|
|
publish_button = self.locator_base.locator('div.btn-post')
|
|||
|
|
if await publish_button.count():
|
|||
|
|
await publish_button.click()
|
|||
|
|
|
|||
|
|
await self.locator_base.locator(success_flag_div).wait_for(state="visible", timeout=3000)
|
|||
|
|
tiktok_logger.success(" [-] video published success")
|
|||
|
|
break
|
|||
|
|
except Exception as e:
|
|||
|
|
if await self.locator_base.locator(success_flag_div).count():
|
|||
|
|
tiktok_logger.success(" [-]video published success")
|
|||
|
|
break
|
|||
|
|
else:
|
|||
|
|
tiktok_logger.exception(f" [-] Exception: {e}")
|
|||
|
|
tiktok_logger.info(" [-] video publishing")
|
|||
|
|
await page.screenshot(full_page=True)
|
|||
|
|
await asyncio.sleep(0.5)
|
|||
|
|
|
|||
|
|
async def detect_upload_status(self, page):
|
|||
|
|
while True:
|
|||
|
|
try:
|
|||
|
|
if await self.locator_base.locator('div.btn-post > button').get_attribute("disabled") is None:
|
|||
|
|
tiktok_logger.info(" [-]video uploaded.")
|
|||
|
|
break
|
|||
|
|
else:
|
|||
|
|
tiktok_logger.info(" [-] video uploading...")
|
|||
|
|
await asyncio.sleep(2)
|
|||
|
|
if await self.locator_base.locator('button[aria-label="Select file"]').count():
|
|||
|
|
tiktok_logger.info(" [-] found some error while uploading now retry...")
|
|||
|
|
await self.handle_upload_error(page)
|
|||
|
|
except:
|
|||
|
|
tiktok_logger.info(" [-] video uploading...")
|
|||
|
|
await asyncio.sleep(2)
|
|||
|
|
|
|||
|
|
async def choose_base_locator(self, page):
|
|||
|
|
# await page.wait_for_selector('div.upload-container')
|
|||
|
|
if await page.locator('iframe[data-tt="Upload_index_iframe"]').count():
|
|||
|
|
self.locator_base = self.locator_base
|
|||
|
|
else:
|
|||
|
|
self.locator_base = page.locator(Tk_Locator.default)
|
|||
|
|
|
|||
|
|
async def main(self):
|
|||
|
|
async with async_playwright() as playwright:
|
|||
|
|
await self.upload(playwright)
|
|||
|
|
|