325 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from pathlib import Path
import sys
import os
# 添加项目根目录到 Python 路径
current_dir = Path(__file__).parent.resolve()
project_root = current_dir.parent.parent
sys.path.append(str(project_root))
from conf import LOCAL_CHROME_PATH, BASE_DIR
import re
from datetime import datetime
from playwright.async_api import Playwright, async_playwright
import os
import asyncio
#from uploader.tk_uploader.tk_config import Tk_Locator
from utils.base_social_media import set_init_script
from utils.files_times import get_absolute_path
from utils.log import ins_logger
from utils.anti_detection import create_stealth_browser, create_stealth_context, setup_stealth_page
from utils.human_typing_wrapper import create_human_typer
async def cookie_auth(account_file):
async with async_playwright() as playwright:
browser = await playwright.chromium.launch(headless=True)
context = await browser.new_context(storage_state=account_file)
context = await set_init_script(context)
# 创建一个新的页面
page = await context.new_page()
# 访问指定的 URL
await page.goto("https://www.instagram.com/")
#await page.wait_for_load_state('networkidle')
await page.wait_for_load_state('domcontentloaded', timeout=30000)
try:
# 选择所有的 select 元素
select_elements = await page.query_selector_all('select')
for element in select_elements:
class_name = await element.get_attribute('class')
# 使用正则表达式匹配特定模式的 class 名称
if re.match(r'ins-.*-SelectFormContainer.*', class_name):
ins_logger.error("[+] cookie expired")
return False
ins_logger.success("[+] cookie valid")
return True
except:
ins_logger.success("[+] cookie valid")
return True
async def ins_setup(account_file, handle=False):
account_file = get_absolute_path(account_file, "ins_uploader")
if not os.path.exists(account_file) or not await cookie_auth(account_file):
if not handle:
return False
ins_logger.info('[+] cookie file is not existed or expired. Now open the browser auto. Please login with your way(gmail phone, whatever, the cookie file will generated after login')
await get_ins_cookie(account_file)
return True
async def get_ins_cookie(account_file):
async with async_playwright() as playwright:
options = {
'args': [
'--lang zh-cn',
],
'headless': False, # Set headless option here
}
# Make sure to run headed.
browser = await playwright.chromium.launch(**options)
# Setup context however you like.
context = await browser.new_context() # Pass any options
context = await set_init_script(context)
# Pause the page, and start recording manually.
page = await context.new_page()
await page.goto("https://www.instagram.com/")
await page.pause()
# 点击调试器的继续保存cookie
await context.storage_state(path=account_file)
# if __name__ == '__main__':
# account_file = Path(BASE_DIR / "cookies" / "ins_uploader" / "account.json")
# account_file.parent.mkdir(exist_ok=True)
# cookie_setup = asyncio.run(ins_setup(str(account_file), handle=True))
class InsVideo(object):
def __init__(self, title, file_path, tags, publish_date, account_file, thumbnail_path=None, headless=True):
self.title = title
self.file_path = file_path
self.tags = tags
self.publish_date = publish_date
self.thumbnail_path = thumbnail_path
self.account_file = account_file
self.local_executable_path = LOCAL_CHROME_PATH
self.locator_base = None
self.headless = headless
# click title to remove the focus.
# await self.locator_base.locator("h1:has-text('Upload video')").click()
async def handle_upload_error(self, page):
ins_logger.info("Instagram上传错误正在重试...")
try:
# 重新查找"从电脑中选择"按钮并重试
select_from_computer_button = page.locator('button._aswp._aswr._aswu._asw_._asx2:has-text("从电脑中选择")')
await select_from_computer_button.wait_for(state='visible', timeout=10000)
async with page.expect_file_chooser() as fc_info:
await select_from_computer_button.click()
file_chooser = await fc_info.value
await file_chooser.set_files(self.file_path)
ins_logger.info("重试上传成功")
except Exception as e:
ins_logger.error(f"重试上传失败: {e}")
async def upload(self, playwright: Playwright) -> None:
# 使用统一的反检测工具
browser = await create_stealth_browser(
playwright=playwright,
headless=self.headless,
executable_path=self.local_executable_path
)
context = await create_stealth_context(
browser=browser,
account_file=self.account_file,
headless=self.headless
)
page = await setup_stealth_page(context, "https://www.instagram.com/")
# 创建人类化输入包装器
human_typer = create_human_typer(page)
ins_logger.info(f'[+]Uploading to Instagram-------{self.title}')
await page.wait_for_load_state('domcontentloaded', timeout=30000)
try:
# 点击创建按钮
create_button = page.locator('span.x1lliihq.x193iq5w.x6ikm8r.x10wlt62.xlyipyv.xuxw1ft:has-text("创建")')
await create_button.wait_for(state='visible', timeout=10000)
await create_button.click()
ins_logger.info("成功点击创建按钮")
# 等待弹窗出现并点击"从电脑中选择"
await page.wait_for_timeout(2000) # 等待弹窗加载
select_from_computer_button = page.locator('button._aswp._aswr._aswu._asw_._asx2:has-text("从电脑中选择")')
await select_from_computer_button.wait_for(state='visible', timeout=10000)
# 选择文件
async with page.expect_file_chooser() as fc_info:
await select_from_computer_button.click()
file_chooser = await fc_info.value
await file_chooser.set_files(self.file_path)
ins_logger.info("成功选择文件")
# 等待文件上传完成
await page.wait_for_timeout(5000)
select_crop_button = page.locator('role=button >> text=继续')
await select_crop_button.wait_for(state='visible', timeout=10000)
await select_crop_button.click()
select_crop_button = page.locator('role=button >> text=继续')
await select_crop_button.wait_for(state='visible', timeout=10000)
await select_crop_button.click()
# 添加标题和标签
await self.add_title_tags(page, human_typer)
# 发布内容
await self.click_publish(page)
except Exception as e:
ins_logger.error(f"上传过程中出现错误: {e}")
# 处理上传错误
await self.handle_upload_error(page)
await context.storage_state(path=f"{self.account_file}") # save cookie
ins_logger.info(' [-] 更新cookie成功')
await asyncio.sleep(2) # close delay for look the video status
# close all
await context.close()
await browser.close()
async def add_title_tags(self, page, human_typer):
try:
# 等待Instagram的文本输入框出现
await page.wait_for_timeout(3000) # 等待页面加载
# 查找Instagram的文本输入框可能有多种选择器
text_selectors = [
'textarea[aria-label="写下说明…"]',
'textarea[placeholder="写下说明…"]',
'div[contenteditable="true"]',
'textarea'
]
text_area = None
successful_selector = None
for selector in text_selectors:
try:
text_area = page.locator(selector).first
await text_area.wait_for(state='visible', timeout=5000)
successful_selector = selector
break
except:
continue
if text_area and successful_selector:
# 构建完整的内容(标题 + 标签)
content = self.title + "\n\n"
tags_text = " ".join([f"#{tag}" for tag in self.tags]) + " "
full_content = content + tags_text
# 使用人类化输入
success = await human_typer.type_text_human(
successful_selector,
full_content,
clear_first=True
)
if not success:
ins_logger.warning("人类化输入失败,使用传统方式")
await text_area.click()
await page.wait_for_timeout(1000)
# 清空现有内容
await page.keyboard.press("Control+A")
await page.keyboard.press("Delete")
# 输入标题
await page.keyboard.insert_text(self.title)
await page.keyboard.press("Enter")
await page.keyboard.press("Enter")
# 添加标签
for index, tag in enumerate(self.tags, start=1):
ins_logger.info(f"添加第 {index} 个标签: {tag}")
await page.keyboard.insert_text(f"#{tag} ")
await page.wait_for_timeout(500)
ins_logger.info("成功添加标题和标签")
else:
ins_logger.warning("未找到文本输入框,跳过添加标题和标签")
# 调试用暂停10秒让你有时间观察页面上的标签是否正确输入
await page.wait_for_timeout(10000)
ins_logger.info("观察时间结束,中断程序")
# 强制中断(后续代码不执行)
raise SystemExit("调试中断:标签输入流程完成")
except Exception as e:
ins_logger.error(f"添加标题和标签时出错: {e}")
async def click_publish(self, page):
try:
# 等待分享/发布按钮出现
await page.wait_for_timeout(2000)
# Instagram发布按钮的可能选择器
publish_selectors = [
'button:has-text("分享")',
'button:has-text("Share")',
'button:has-text("发布")',
'button:has-text("Post")',
'button[type="submit"]'
]
publish_button = None
for selector in publish_selectors:
try:
publish_button = page.locator(selector).first
await publish_button.wait_for(state='visible', timeout=5000)
break
except:
continue
if publish_button:
await publish_button.click()
ins_logger.info("成功点击发布按钮")
# 等待发布完成
await page.wait_for_timeout(5000)
ins_logger.success("Instagram内容发布成功")
else:
ins_logger.warning("未找到发布按钮")
except Exception as e:
ins_logger.error(f"发布时出错: {e}")
async def main():
"""用于测试Instagram上传功能的主函数"""
from pathlib import Path
# 配置文件路径
account_file = Path(__file__).parent.parent.parent / "cookies" / "ins_uploader" / "account.json"
account_file.parent.mkdir(exist_ok=True, parents=True)
# 首先确保cookie有效
if not await ins_setup(str(account_file), handle=True):
ins_logger.error("Cookie设置失败")
return
# 创建测试视频对象
video_path = Path(__file__).parent.parent.parent / "videos" / "demo.mp4"
if not video_path.exists():
ins_logger.error(f"测试视频文件不存在: {video_path}")
return
ins_video = InsVideo(
title="测试Instagram上传",
file_path=str(video_path),
tags=["test", "instagram", "upload"],
publish_date=datetime.now(),
account_file=str(account_file),
headless=False
)
# 开始上传
async with async_playwright() as playwright:
await ins_video.upload(playwright)
if __name__ == '__main__':
asyncio.run(main())