483 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
from datetime import datetime
from playwright.async_api import Playwright, async_playwright
import os
import asyncio
from conf import LOCAL_CHROME_PATH
from utils.base_social_media import set_init_script
from utils.files_times import get_absolute_path
from utils.log import tencent_logger
from utils.anti_detection import create_stealth_browser, create_stealth_context, setup_stealth_page
from utils.human_typing_wrapper import create_human_typer
def format_str_for_short_title(origin_title: str) -> str:
# 定义允许的特殊字符
allowed_special_chars = "《》“”:+?%°"
# 移除不允许的特殊字符
filtered_chars = [char if char.isalnum() or char in allowed_special_chars else ' ' if char == ',' else '' for
char in origin_title]
formatted_string = ''.join(filtered_chars)
# 调整字符串长度
if len(formatted_string) > 16:
# 截断字符串
formatted_string = formatted_string[:16]
elif len(formatted_string) < 6:
# 使用空格来填充字符串
formatted_string += ' ' * (6 - len(formatted_string))
return formatted_string
async def cookie_auth(account_file):
async with async_playwright() as playwright:
browser = await playwright.chromium.launch(headless=True)
context = await browser.new_context(storage_state=account_file)
context = await set_init_script(context)
# 创建一个新的页面
page = await context.new_page()
# 访问指定的 URL
await page.goto("https://channels.weixin.qq.com/platform/post/create")
try:
await page.wait_for_selector('div.title-name:has-text("微信小店")', timeout=5000) # 等待5秒
tencent_logger.error("[+] 等待5秒 cookie 失效")
return False
except:
tencent_logger.success("[+] cookie 有效")
return True
async def get_tencent_cookie(account_file):
async with async_playwright() as playwright:
options = {
'args': [
'--lang en-GB'
],
'headless': False, # Set headless option here
}
# Make sure to run headed.
browser = await playwright.chromium.launch(**options)
# Setup context however you like.
context = await browser.new_context() # Pass any options
# Pause the page, and start recording manually.
context = await set_init_script(context)
page = await context.new_page()
await page.goto("https://channels.weixin.qq.com")
await page.pause()
# 点击调试器的继续保存cookie
await context.storage_state(path=account_file)
async def weixin_setup(account_file, handle=False):
account_file = get_absolute_path(account_file, "tencent_uploader")
if not os.path.exists(account_file) or not await cookie_auth(account_file):
if not handle:
# Todo alert message
return False
tencent_logger.info('[+] cookie文件不存在或已失效即将自动打开浏览器请扫码登录登陆后会自动生成cookie文件')
await get_tencent_cookie(account_file)
return True
class TencentVideo(object):
def __init__(self, title, file_path, tags, publish_date: datetime, account_file, category=None, headless=True):
self.title = title # 视频标题
self.file_path = file_path
self.tags = tags
self.publish_date = publish_date
self.account_file = account_file
self.category = category
self.local_executable_path = LOCAL_CHROME_PATH
self.headless = headless
async def set_schedule_time_tencent(self, page, publish_date):
label_element = page.locator("label").filter(has_text="定时").nth(1)
await label_element.click()
await page.click('input[placeholder="请选择发表时间"]')
str_month = str(publish_date.month) if publish_date.month > 9 else "0" + str(publish_date.month)
current_month = str_month + ""
# 获取当前的月份
page_month = await page.inner_text('span.weui-desktop-picker__panel__label:has-text("")')
# 检查当前月份是否与目标月份相同
if page_month != current_month:
await page.click('button.weui-desktop-btn__icon__right')
# 获取页面元素
elements = await page.query_selector_all('table.weui-desktop-picker__table a')
# 遍历元素并点击匹配的元素
for element in elements:
if 'weui-desktop-picker__disabled' in await element.evaluate('el => el.className'):
continue
text = await element.inner_text()
if text.strip() == str(publish_date.day):
await element.click()
break
# 输入小时部分假设选择11小时
await page.click('input[placeholder="请选择时间"]')
await page.keyboard.press("Control+KeyA")
await page.keyboard.type(str(publish_date.hour))
# 选择标题栏(令定时时间生效)
await page.locator("div.input-editor").click()
async def handle_upload_error(self, page):
tencent_logger.info("视频出错了,重新上传中")
await page.locator('div.media-status-content div.tag-inner:has-text("删除")').click()
await page.get_by_role('button', name="删除", exact=True).click()
file_input = page.locator('input[type="file"]')
await file_input.set_input_files(self.file_path)
async def upload(self, playwright: Playwright) -> None:
# 使用统一的反检测工具 (这里使用系统内浏览器用chromium 会造成h264错误
browser = await create_stealth_browser(
playwright=playwright,
headless=self.headless,
executable_path=self.local_executable_path
)
context = await create_stealth_context(
browser=browser,
account_file=self.account_file,
headless=self.headless
)
page = await setup_stealth_page(context, "https://channels.weixin.qq.com/platform/post/create")
# 创建人类化输入包装器
human_typer = create_human_typer(page)
tencent_logger.info(f'[+]正在上传-------{self.title}.mp4')
# 等待页面跳转到指定的 URL没进入则自动等待到超时
await page.wait_for_url("https://channels.weixin.qq.com/platform/post/create")
# 更温和的页面加载等待策略
try:
# 首先等待DOM内容加载
await page.wait_for_load_state('domcontentloaded', timeout=10000)
tencent_logger.info("页面DOM加载完成")
# 尝试等待网络空闲,但设置较短超时
try:
await page.wait_for_load_state('networkidle', timeout=8000)
tencent_logger.info("页面网络活动静止")
except Exception:
tencent_logger.warning("网络空闲等待超时,继续执行...")
# 等待一段时间让页面稳定
await page.wait_for_timeout(3000)
except Exception as e:
tencent_logger.warning(f"页面加载等待出现问题: {e}")
# 即使页面加载有问题,也尝试继续执行
await page.wait_for_timeout(2000)
# 改进的文件上传处理
await self.handle_file_upload(page)
# 填充标题和话题
await self.add_title_tags(page, human_typer)
# 添加商品
# await self.add_product(page)
# 合集功能
await self.add_collection(page)
# 原创选择
await self.add_original(page)
# 检测上传状态
await self.detect_upload_status(page)
if self.publish_date != 0:
await self.set_schedule_time_tencent(page, self.publish_date)
# 添加短标题
await self.add_short_title(page)
await self.click_publish(page)
await context.storage_state(path=f"{self.account_file}") # 保存cookie
tencent_logger.success(' [-]cookie更新完毕')
await asyncio.sleep(2) # 这里延迟是为了方便眼睛直观的观看
# 关闭浏览器上下文和浏览器实例
await context.close()
await browser.close()
async def add_short_title(self, page):
short_title_element = page.get_by_text("短标题", exact=True).locator("..").locator(
"xpath=following-sibling::div").locator(
'span input[type="text"]')
if await short_title_element.count():
short_title = format_str_for_short_title(self.title)
# 创建人类化输入包装器用于短标题
human_typer = create_human_typer(page)
# 先点击短标题输入框确保获得焦点,然后清空并使用人类化输入
await short_title_element.click()
await page.wait_for_timeout(200)
await page.keyboard.press("Control+A")
await page.wait_for_timeout(100)
# 使用人类化输入包装器的内部方法进行输入(因为已经获得焦点)
success = False
try:
# 直接使用人类化输入的字符输入功能
await human_typer._type_text_continuously(short_title)
success = True
tencent_logger.info("短标题人类化输入成功")
except Exception as e:
tencent_logger.warning(f"短标题人类化输入失败: {e}")
if not success:
tencent_logger.warning("短标题人类化输入失败,使用传统方式")
await short_title_element.fill(short_title)
async def click_publish(self, page):
while True:
try:
publish_buttion = page.locator('div.form-btns button:has-text("发表")')
if await publish_buttion.count():
await publish_buttion.click()
await page.wait_for_url("https://channels.weixin.qq.com/platform/post/list", timeout=5000)
tencent_logger.success(" [-]视频发布成功")
break
except Exception as e:
current_url = page.url
if "https://channels.weixin.qq.com/platform/post/list" in current_url:
tencent_logger.success(" [-]视频发布成功")
break
else:
tencent_logger.exception(f" [-] Exception: {e}")
tencent_logger.info(" [-] 视频正在发布中...")
await asyncio.sleep(0.5)
async def detect_upload_status(self, page):
while True:
# 匹配删除按钮,代表视频上传完毕,如果不存在,代表视频正在上传,则等待
try:
# 匹配删除按钮,代表视频上传完毕
if "weui-desktop-btn_disabled" not in await page.get_by_role("button", name="发表").get_attribute(
'class'):
tencent_logger.info(" [-]视频上传完毕")
break
else:
tencent_logger.info(" [-] 正在上传视频中...")
await asyncio.sleep(2)
# 出错了视频出错
if await page.locator('div.status-msg.error').count() and await page.locator(
'div.media-status-content div.tag-inner:has-text("删除")').count():
tencent_logger.error(" [-] 发现上传出错了...准备重试")
await self.handle_upload_error(page)
except:
tencent_logger.info(" [-] 正在上传视频中...")
await asyncio.sleep(2)
async def add_title_tags(self, page, human_typer):
# 构建完整内容(标题 + 标签)
content = self.title + "\n"
tags_text = " ".join([f"#{tag}" for tag in self.tags]) + " "
full_content = content + tags_text
# 使用人类化输入
success = await human_typer.type_text_human(
"div.input-editor",
full_content,
clear_first=True
)
if not success:
tencent_logger.warning("人类化输入失败,使用传统方式")
await page.locator("div.input-editor").click()
await page.keyboard.type(self.title)
await page.keyboard.press("Enter")
for index, tag in enumerate(self.tags, start=1):
await page.keyboard.type("#" + tag)
await page.keyboard.press("Space")
tencent_logger.info(f"成功添加hashtag: {len(self.tags)}")
async def add_collection(self, page):
collection_elements = page.get_by_text("添加到合集").locator("xpath=following-sibling::div").locator(
'.option-list-wrap > div')
if await collection_elements.count() > 1:
await page.get_by_text("添加到合集").locator("xpath=following-sibling::div").click()
await collection_elements.first.click()
async def add_original(self, page):
if await page.get_by_label("视频为原创").count():
await page.get_by_label("视频为原创").check()
# 检查 "我已阅读并同意 《视频号原创声明使用条款》" 元素是否存在
label_locator = await page.locator('label:has-text("我已阅读并同意 《视频号原创声明使用条款》")').is_visible()
if label_locator:
await page.get_by_label("我已阅读并同意 《视频号原创声明使用条款》").check()
await page.get_by_role("button", name="声明原创").click()
# 2023年11月20日 wechat更新: 可能新账号或者改版账号,出现新的选择页面
if await page.locator('div.label span:has-text("声明原创")').count() and self.category:
# 因处罚无法勾选原创,故先判断是否可用
if not await page.locator('div.declare-original-checkbox input.ant-checkbox-input').is_disabled():
await page.locator('div.declare-original-checkbox input.ant-checkbox-input').click()
if not await page.locator(
'div.declare-original-dialog label.ant-checkbox-wrapper.ant-checkbox-wrapper-checked:visible').count():
await page.locator('div.declare-original-dialog input.ant-checkbox-input:visible').click()
if await page.locator('div.original-type-form > div.form-label:has-text("原创类型"):visible').count():
await page.locator('div.form-content:visible').click() # 下拉菜单
await page.locator(
f'div.form-content:visible ul.weui-desktop-dropdown__list li.weui-desktop-dropdown__list-ele:has-text("{self.category}")').first.click()
await page.wait_for_timeout(1000)
if await page.locator('button:has-text("声明原创"):visible').count():
await page.locator('button:has-text("声明原创"):visible').click()
async def handle_file_upload(self, page):
"""改进的文件上传处理,支持无头模式"""
try:
tencent_logger.info("开始查找文件上传元素...")
# 先等待页面完全稳定
await page.wait_for_timeout(2000)
# 多种文件上传元素定位策略,按优先级排序
upload_selectors = [
'input[type="file"]',
'input[accept*="video"]',
'input[accept*="mp4"]',
'input[accept*=".mp4"]',
'.upload-input',
'[data-testid*="upload"]',
'input[accept*="*"]' # 通用文件输入
]
file_input = None
successful_selector = None
# 逐个尝试选择器
for selector in upload_selectors:
try:
tencent_logger.debug(f"尝试选择器: {selector}")
elements = page.locator(selector)
element_count = await elements.count()
if element_count > 0:
tencent_logger.info(f"找到 {element_count} 个匹配元素: {selector}")
# 尝试每个匹配的元素
for i in range(element_count):
try:
element = elements.nth(i)
# 检查元素是否可用
is_visible = await element.is_visible()
is_enabled = await element.is_enabled()
tencent_logger.debug(f"元素 {i}: visible={is_visible}, enabled={is_enabled}")
# 即使不可见,文件输入元素通常也能工作
file_input = element
successful_selector = f"{selector}[{i}]"
break
except Exception as e:
tencent_logger.debug(f"元素 {i} 检查失败: {e}")
continue
if file_input:
break
except Exception as e:
tencent_logger.debug(f"选择器 {selector} 失败: {e}")
continue
if not file_input:
tencent_logger.error("未找到可用的文件上传元素")
# 调试:输出页面信息
await self.debug_page_state(page)
raise Exception("无法找到文件上传元素")
tencent_logger.info(f"使用元素: {successful_selector}")
# 多重尝试上传文件
upload_success = False
max_attempts = 3
for attempt in range(max_attempts):
try:
tencent_logger.info(f"尝试上传文件 (第{attempt + 1}次): {self.file_path}")
# 等待元素就绪
await file_input.wait_for(state='attached', timeout=5000)
# 上传文件
await file_input.set_input_files(self.file_path)
# 验证上传是否成功(等待一下看是否有变化)
await page.wait_for_timeout(2000)
tencent_logger.success("文件上传成功")
upload_success = True
break
except Exception as e:
tencent_logger.warning(f"{attempt + 1}次上传尝试失败: {e}")
if attempt < max_attempts - 1:
await page.wait_for_timeout(1000) # 等待后重试
continue
if not upload_success:
raise Exception("多次尝试后文件上传仍然失败")
except Exception as e:
tencent_logger.error(f"文件上传失败: {e}")
raise
async def debug_page_state(self, page):
"""调试页面状态,帮助分析无头模式问题"""
try:
tencent_logger.info("🔍 调试页面状态...")
# 页面基本信息
url = page.url
title = await page.title()
tencent_logger.info(f"当前URL: {url}")
tencent_logger.info(f"页面标题: {title}")
# 检查是否有错误信息
error_selectors = [
'.error', '.error-message', '.warning',
'[class*="error"]', '[class*="Error"]'
]
for selector in error_selectors:
try:
elements = await page.query_selector_all(selector)
if elements:
for element in elements[:3]: # 只显示前3个
text = await element.inner_text()
if text.strip():
tencent_logger.warning(f"发现错误信息: {text}")
except:
pass
# 检查页面内容长度
content = await page.content()
tencent_logger.info(f"页面HTML长度: {len(content)} 字符")
if len(content) < 5000:
tencent_logger.warning("⚠️ 页面内容较短,可能加载不完整")
# 在无头模式下,截图可能显示页面状态
if self.headless:
try:
await page.screenshot(path=f"debug_tencent_{self.headless}.png")
tencent_logger.info("已保存调试截图")
except:
pass
except Exception as e:
tencent_logger.error(f"调试页面状态时出错: {e}")
async def main(self):
async with async_playwright() as playwright:
await self.upload(playwright)