401 lines
16 KiB
Python
401 lines
16 KiB
Python
from pathlib import Path
|
||
import sys
|
||
import os
|
||
|
||
# 添加项目根目录到 Python 路径
|
||
current_dir = Path(__file__).parent.resolve()
|
||
project_root = current_dir.parent.parent
|
||
sys.path.append(str(project_root))
|
||
|
||
from conf import LOCAL_CHROME_PATH, BASE_DIR
|
||
import re
|
||
from datetime import datetime
|
||
|
||
from playwright.async_api import Playwright, async_playwright
|
||
import os
|
||
import asyncio
|
||
#from uploader.tk_uploader.tk_config import Tk_Locator
|
||
from utils.base_social_media import set_init_script
|
||
from utils.files_times import get_absolute_path
|
||
from utils.log import X_logger
|
||
from utils.anti_detection import create_stealth_browser, create_stealth_context, setup_stealth_page
|
||
|
||
|
||
async def cookie_auth(account_file):
|
||
async with async_playwright() as playwright:
|
||
browser = await playwright.chromium.launch(headless=True)
|
||
context = await browser.new_context(storage_state=account_file)
|
||
context = await set_init_script(context)
|
||
# 创建一个新的页面
|
||
page = await context.new_page()
|
||
# 访问指定的 URL
|
||
await page.goto("https://x.com/i/flow/login")
|
||
#await page.wait_for_load_state('networkidle')
|
||
await page.wait_for_load_state('domcontentloaded', timeout=30000)
|
||
try:
|
||
# 选择所有的 select 元素
|
||
select_elements = await page.query_selector_all('select')
|
||
for element in select_elements:
|
||
class_name = await element.get_attribute('class')
|
||
# 使用正则表达式匹配特定模式的 class 名称
|
||
if re.match(r'ins-.*-SelectFormContainer.*', class_name):
|
||
X_logger.error("[+] cookie expired")
|
||
return False
|
||
X_logger.success("[+] cookie valid")
|
||
return True
|
||
except:
|
||
X_logger.success("[+] cookie valid")
|
||
return True
|
||
|
||
|
||
async def X_setup(account_file, handle=False):
|
||
account_file = get_absolute_path(account_file, "X_uploader")
|
||
if not os.path.exists(account_file) or not await cookie_auth(account_file):
|
||
if not handle:
|
||
return False
|
||
X_logger.info('[+] cookie file is not existed or expired. Now open the browser auto. Please login with your way(gmail phone, whatever, the cookie file will generated after login')
|
||
await get_X_cookie(account_file)
|
||
return True
|
||
|
||
|
||
async def get_X_cookie(account_file):
|
||
async with async_playwright() as playwright:
|
||
options = {
|
||
'args': [
|
||
'--lang en-GB',
|
||
],
|
||
'headless': False, # Set headless option here
|
||
}
|
||
# Make sure to run headed.
|
||
browser = await playwright.chromium.launch(**options)
|
||
# Setup context however you like.
|
||
context = await browser.new_context() # Pass any options
|
||
context = await set_init_script(context)
|
||
# Pause the page, and start recording manually.
|
||
page = await context.new_page()
|
||
await page.goto("https://x.com/")
|
||
await page.pause()
|
||
# 点击调试器的继续,保存cookie
|
||
await context.storage_state(path=account_file)
|
||
|
||
class XVideo(object):
|
||
def __init__(self, title, file_path, tags, publish_date, account_file, thumbnail_path=None, headless=True):
|
||
self.title = title
|
||
self.file_path = file_path
|
||
self.tags = tags
|
||
self.publish_date = publish_date
|
||
self.thumbnail_path = thumbnail_path
|
||
self.account_file = account_file
|
||
self.local_executable_path = LOCAL_CHROME_PATH
|
||
self.headless = headless # 是否使用无头模式,默认为True
|
||
self.locator_base = None
|
||
|
||
async def handle_upload_error(self, page):
|
||
X_logger.info("X(Twitter)上传错误,正在重试...")
|
||
try:
|
||
# 重新查找文件上传元素并重试
|
||
upload_element = page.locator('input[type="file"]')
|
||
await upload_element.wait_for(state='visible', timeout=10000)
|
||
await upload_element.set_input_files(self.file_path)
|
||
X_logger.info("重试上传成功")
|
||
except Exception as e:
|
||
X_logger.error(f"重试上传失败: {e}")
|
||
|
||
async def upload(self, playwright: Playwright) -> None:
|
||
# 使用统一的反检测工具
|
||
browser = await create_stealth_browser(
|
||
playwright=playwright,
|
||
headless=self.headless,
|
||
executable_path=self.local_executable_path
|
||
)
|
||
|
||
context = await create_stealth_context(
|
||
browser=browser,
|
||
account_file=self.account_file,
|
||
headless=self.headless
|
||
)
|
||
|
||
page = await setup_stealth_page(context, "https://x.com/")
|
||
X_logger.info(f'[+]Uploading to X(Twitter)-------{self.title}')
|
||
|
||
await page.wait_for_load_state('domcontentloaded', timeout=30000)
|
||
|
||
try:
|
||
# 基础的X上传逻辑(需要根据实际页面结构完善)
|
||
X_logger.info("X上传器基础框架已就绪,需要根据实际页面结构完善上传逻辑")
|
||
|
||
# TODO: 实现具体的上传逻辑
|
||
# 1. 点击发推按钮
|
||
# 2. 上传媒体文件
|
||
# 3. 添加文本内容
|
||
# 4. 发布推文
|
||
|
||
except Exception as e:
|
||
X_logger.error(f"上传过程中出现错误: {e}")
|
||
await self.handle_upload_error(page)
|
||
|
||
await context.storage_state(path=f"{self.account_file}") # save cookie
|
||
X_logger.info(' [-] 更新cookie成功!')
|
||
await asyncio.sleep(2)
|
||
# close all
|
||
await context.close()
|
||
await browser.close()
|
||
|
||
async def main(self):
|
||
async with async_playwright() as playwright:
|
||
await self.upload(playwright)
|
||
|
||
|
||
async def main():
|
||
"""用于测试X上传功能的主函数"""
|
||
from pathlib import Path
|
||
|
||
# 配置文件路径
|
||
account_file = Path(__file__).parent.parent.parent / "cookies" / "X_uploader" / "account.json"
|
||
account_file.parent.mkdir(exist_ok=True, parents=True)
|
||
|
||
# 首先确保cookie有效
|
||
if not await X_setup(str(account_file), handle=True):
|
||
X_logger.error("Cookie设置失败")
|
||
return
|
||
|
||
# 创建测试视频对象
|
||
video_path = Path(__file__).parent.parent.parent / "videos" / "demo.mp4"
|
||
|
||
if not video_path.exists():
|
||
X_logger.error(f"测试视频文件不存在: {video_path}")
|
||
return
|
||
|
||
x_video = XVideo(
|
||
title="测试X(Twitter)上传",
|
||
file_path=str(video_path),
|
||
tags=["test", "twitter", "upload"],
|
||
publish_date=datetime.now(),
|
||
account_file=str(account_file),
|
||
headless=False # 推荐使用有头模式
|
||
)
|
||
|
||
# 开始上传
|
||
await x_video.main()
|
||
|
||
|
||
if __name__ == '__main__':
|
||
account_file = Path(BASE_DIR / "cookies" / "X_uploader" / "account.json")
|
||
account_file.parent.mkdir(exist_ok=True)
|
||
cookie_setup = asyncio.run(X_setup(str(account_file), handle=True))
|
||
|
||
# 如果需要测试上传功能,取消下面的注释
|
||
# asyncio.run(main())
|
||
|
||
# 以下是旧的注释代码,保留作为参考
|
||
# class InsVideo(object):
|
||
# def __init__(self, title, file_path, tags, publish_date, account_file, thumbnail_path=None):
|
||
# self.title = title
|
||
# self.file_path = file_path
|
||
# self.tags = tags
|
||
# self.publish_date = publish_date
|
||
# self.thumbnail_path = thumbnail_path
|
||
# self.account_file = account_file
|
||
# self.local_executable_path = LOCAL_CHROME_PATH
|
||
# self.locator_base = None
|
||
# # click title to remove the focus.
|
||
# # await self.locator_base.locator("h1:has-text('Upload video')").click()
|
||
|
||
# async def handle_upload_error(self, page):
|
||
# X_logger.info("Instagram上传错误,正在重试...")
|
||
# try:
|
||
# # 重新查找"从电脑中选择"按钮并重试
|
||
# select_from_computer_button = page.locator('button._aswp._aswr._aswu._asw_._asx2:has-text("从电脑中选择")')
|
||
# await select_from_computer_button.wait_for(state='visible', timeout=10000)
|
||
|
||
# async with page.expect_file_chooser() as fc_info:
|
||
# await select_from_computer_button.click()
|
||
# file_chooser = await fc_info.value
|
||
# await file_chooser.set_files(self.file_path)
|
||
# X_logger.info("重试上传成功")
|
||
# except Exception as e:
|
||
# X_logger.error(f"重试上传失败: {e}")
|
||
|
||
# async def upload(self, playwright: Playwright) -> None:
|
||
# browser = await playwright.chromium.launch(headless=False, executable_path=self.local_executable_path)
|
||
# context = await browser.new_context(storage_state=f"{self.account_file}")
|
||
# context = await set_init_script(context)
|
||
# page = await context.new_page()
|
||
|
||
# # 访问Instagram主页
|
||
# await page.goto("https://www.instagram.com/")
|
||
# X_logger.info(f'[+]Uploading to Instagram-------{self.title}')
|
||
|
||
# await page.wait_for_load_state('domcontentloaded', timeout=30000)
|
||
|
||
# try:
|
||
# # 点击创建按钮
|
||
# create_button = page.locator('span.x1lliihq.x193iq5w.x6ikm8r.x10wlt62.xlyipyv.xuxw1ft:has-text("创建")')
|
||
# await create_button.wait_for(state='visible', timeout=10000)
|
||
# await create_button.click()
|
||
# X_logger.info("成功点击创建按钮")
|
||
|
||
# # 等待弹窗出现并点击"从电脑中选择"
|
||
# await page.wait_for_timeout(2000) # 等待弹窗加载
|
||
|
||
# select_from_computer_button = page.locator('button._aswp._aswr._aswu._asw_._asx2:has-text("从电脑中选择")')
|
||
# await select_from_computer_button.wait_for(state='visible', timeout=10000)
|
||
|
||
# # 选择文件
|
||
# async with page.expect_file_chooser() as fc_info:
|
||
# await select_from_computer_button.click()
|
||
# file_chooser = await fc_info.value
|
||
# await file_chooser.set_files(self.file_path)
|
||
# X_logger.info("成功选择文件")
|
||
|
||
# # 等待文件上传完成
|
||
# await page.wait_for_timeout(5000)
|
||
# select_crop_button = page.locator('role=button >> text=继续')
|
||
# await select_crop_button.wait_for(state='visible', timeout=10000)
|
||
# await select_crop_button.click()
|
||
# select_crop_button = page.locator('role=button >> text=继续')
|
||
# await select_crop_button.wait_for(state='visible', timeout=10000)
|
||
# await select_crop_button.click()
|
||
# # 添加标题和标签
|
||
# await self.add_title_tags(page)
|
||
|
||
# # 发布内容
|
||
# await self.click_publish(page)
|
||
|
||
# except Exception as e:
|
||
# X_logger.error(f"上传过程中出现错误: {e}")
|
||
# # 处理上传错误
|
||
# await self.handle_upload_error(page)
|
||
|
||
# await context.storage_state(path=f"{self.account_file}") # save cookie
|
||
# X_logger.info(' [-] 更新cookie成功!')
|
||
# await asyncio.sleep(2) # close delay for look the video status
|
||
# # close all
|
||
# await context.close()
|
||
# await browser.close()
|
||
|
||
# async def add_title_tags(self, page):
|
||
# try:
|
||
# # 等待Instagram的文本输入框出现
|
||
# await page.wait_for_timeout(3000) # 等待页面加载
|
||
|
||
# # 查找Instagram的文本输入框(可能有多种选择器)
|
||
# text_selectors = [
|
||
# 'textarea[aria-label="写下说明…"]',
|
||
# 'textarea[placeholder="写下说明…"]',
|
||
# 'div[contenteditable="true"]',
|
||
# 'textarea'
|
||
# ]
|
||
|
||
# text_area = None
|
||
# for selector in text_selectors:
|
||
# try:
|
||
# text_area = page.locator(selector).first
|
||
# await text_area.wait_for(state='visible', timeout=5000)
|
||
# break
|
||
# except:
|
||
# continue
|
||
|
||
# if text_area:
|
||
# await text_area.click()
|
||
# await page.wait_for_timeout(1000)
|
||
|
||
# # 清空现有内容
|
||
# await page.keyboard.press("Control+A")
|
||
# await page.keyboard.press("Delete")
|
||
|
||
# # 输入标题
|
||
# await page.keyboard.insert_text(self.title)
|
||
# await page.keyboard.press("Enter")
|
||
# await page.keyboard.press("Enter")
|
||
|
||
# # 添加标签
|
||
# for index, tag in enumerate(self.tags, start=1):
|
||
# X_logger.info(f"添加第 {index} 个标签: {tag}")
|
||
# await page.keyboard.insert_text(f"#{tag} ")
|
||
# await page.wait_for_timeout(500)
|
||
|
||
# X_logger.info("成功添加标题和标签")
|
||
# else:
|
||
# X_logger.warning("未找到文本输入框,跳过添加标题和标签")
|
||
# # 调试用:暂停10秒,让你有时间观察页面上的标签是否正确输入
|
||
# await page.wait_for_timeout(10000)
|
||
# X_logger.info("观察时间结束,中断程序")
|
||
|
||
# # 强制中断(后续代码不执行)
|
||
# raise SystemExit("调试中断:标签输入流程完成")
|
||
# except Exception as e:
|
||
# X_logger.error(f"添加标题和标签时出错: {e}")
|
||
|
||
# async def click_publish(self, page):
|
||
# try:
|
||
# # 等待分享/发布按钮出现
|
||
# await page.wait_for_timeout(2000)
|
||
|
||
# # Instagram发布按钮的可能选择器
|
||
# publish_selectors = [
|
||
# 'button:has-text("分享")',
|
||
# 'button:has-text("Share")',
|
||
# 'button:has-text("发布")',
|
||
# 'button:has-text("Post")',
|
||
# 'button[type="submit"]'
|
||
# ]
|
||
|
||
# publish_button = None
|
||
# for selector in publish_selectors:
|
||
# try:
|
||
# publish_button = page.locator(selector).first
|
||
# await publish_button.wait_for(state='visible', timeout=5000)
|
||
# break
|
||
# except:
|
||
# continue
|
||
|
||
# if publish_button:
|
||
# await publish_button.click()
|
||
# X_logger.info("成功点击发布按钮")
|
||
|
||
# # 等待发布完成
|
||
# await page.wait_for_timeout(5000)
|
||
# X_logger.success("Instagram内容发布成功!")
|
||
# else:
|
||
# X_logger.warning("未找到发布按钮")
|
||
|
||
# except Exception as e:
|
||
# X_logger.error(f"发布时出错: {e}")
|
||
|
||
|
||
# async def main():
|
||
# """用于测试Instagram上传功能的主函数"""
|
||
# from pathlib import Path
|
||
|
||
# # 配置文件路径
|
||
# account_file = Path(__file__).parent.parent.parent / "cookies" / "ins_uploader" / "account.json"
|
||
# account_file.parent.mkdir(exist_ok=True, parents=True)
|
||
|
||
# # 首先确保cookie有效
|
||
# if not await ins_setup(str(account_file), handle=True):
|
||
# X_logger.error("Cookie设置失败")
|
||
# return
|
||
|
||
# # 创建测试视频对象
|
||
# video_path = Path(__file__).parent.parent.parent / "videos" / "demo.mp4"
|
||
|
||
# if not video_path.exists():
|
||
# X_logger.error(f"测试视频文件不存在: {video_path}")
|
||
# return
|
||
|
||
# ins_video = InsVideo(
|
||
# title="测试Instagram上传",
|
||
# file_path=str(video_path),
|
||
# tags=["test", "instagram", "upload"],
|
||
# publish_date=datetime.now(),
|
||
# account_file=str(account_file)
|
||
# )
|
||
|
||
# # 开始上传
|
||
# async with async_playwright() as playwright:
|
||
# await ins_video.upload(playwright)
|
||
|
||
|
||
# if __name__ == '__main__':
|
||
# asyncio.run(main()) |