2025-09-08 09:32:45 +08:00

485 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from pathlib import Path
import sys
import os
# 添加项目根目录到 Python 路径
current_dir = Path(__file__).parent.resolve()
project_root = current_dir.parent.parent
sys.path.append(str(project_root))
from conf import LOCAL_CHROME_PATH, BASE_DIR
import re
from datetime import datetime
from playwright.async_api import Playwright, async_playwright
import os
import asyncio
#from uploader.tk_uploader.tk_config import Tk_Locator
from utils.base_social_media import set_init_script
from utils.files_times import get_absolute_path
from utils.log import X_logger
from utils.files_times import generate_schedule_time_next_day
async def cookie_auth(account_file):
async with async_playwright() as playwright:
browser = await playwright.chromium.launch(headless=True)
context = await browser.new_context(storage_state=account_file)
context = await set_init_script(context)
# 创建一个新的页面
page = await context.new_page()
# 访问指定的 URL
await page.goto("https://mp.weixin.qq.com/")
#await page.wait_for_load_state('networkidle')
await page.wait_for_load_state('domcontentloaded', timeout=30000)
try:
# 选择所有的 select 元素
select_elements = await page.query_selector_all('select')
for element in select_elements:
class_name = await element.get_attribute('class')
# 使用正则表达式匹配特定模式的 class 名称
if re.match(r'ins-.*-SelectFormContainer.*', class_name):
X_logger.error("[+] cookie expired")
return False
X_logger.success("[+] cookie valid")
return True
except:
X_logger.success("[+] cookie valid")
return True
async def wechat_setup(account_file, handle=False):
account_file = get_absolute_path(account_file, "https://mp.weixin.qq.com/")
if not os.path.exists(account_file) or not await cookie_auth(account_file):
if not handle:
return False
X_logger.info('[+] cookie file is not existed or expired. Now open the browser auto. Please login with your way(gmail phone, whatever, the cookie file will generated after login')
await get_wechat_cookie(account_file)
return True
async def get_wechat_cookie(account_file):
async with async_playwright() as playwright:
options = {
'args': [
'--lang zh_CN',
],
'headless': False, # Set headless option here
}
# Make sure to run headed.
browser = await playwright.chromium.launch(**options)
# Setup context however you like.
context = await browser.new_context() # Pass any options
context = await set_init_script(context)
# Pause the page, and start recording manually.
page = await context.new_page()
await page.goto("https://mp.weixin.qq.com/")
await page.pause()
# 点击调试器的继续保存cookie
await context.storage_state(path=account_file)
# if __name__ == '__main__':
# account_file = Path(BASE_DIR / "cookies" / "wechat_uploader" / "account.json")
# account_file.parent.mkdir(exist_ok=True)
# cookie_setup = asyncio.run(wechat_setup(str(account_file), handle=True))
class WechatVideo(object):
def __init__(self, title, file_path, tags, publish_date, account_file, thumbnail_path=None):
self.title = title
self.file_path = file_path
self.tags = tags
self.publish_date = publish_date
self.thumbnail_path = thumbnail_path
self.account_file = account_file
self.local_executable_path = LOCAL_CHROME_PATH
self.locator_base = None
# click title to remove the focus.
# await self.locator_base.locator("h1:has-text('Upload video')").click()
async def set_schedule_time_wechat(self, page, publish_date):
# 先点击定时发表开关,启用定时发布功能
await page.locator('.mass-send__td-setting > .publish_container > .weui-desktop-form__controls > .mass-send__timer-wrp > .weui-desktop-switch').click()
await asyncio.sleep(1)
deta = (publish_date.date()-datetime.now().date()).days
if deta == 0:
day_str = "今天"
if deta == 1:
day_str = "明天"
else:
day_str = publish_date.strftime("%#m月%#d")
time_str = publish_date.strftime("%H:%M")
await asyncio.sleep(1)
# 点击日期下拉框选择第3个匹配的元素即日期选择器
await page.locator('dt.weui-desktop-form__dropdown__dt.weui-desktop-form__dropdown__inner-button').nth(2).click()
await asyncio.sleep(1)
# 在下拉列表中查找并点击对应的日期
dropdown_option = page.locator(f'span.weui-desktop-dropdown__list-ele__text:has-text("{day_str}")')
await dropdown_option.click()
await asyncio.sleep(1)
# 选择时间(点击第二个时间选择器容器)
await page.locator('dl.weui-desktop-picker__time').nth(1).click()
await page.keyboard.press("Control+KeyA")
await page.keyboard.type(str(time_str))
await asyncio.sleep(0.5)
# 点击页面其他地方关闭时间选择下拉栏,使发表按钮显示
await page.locator('body').click()
await asyncio.sleep(1)
# 点击发表按钮
await page.locator('button.weui-desktop-btn.weui-desktop-btn_primary:has-text("发表")').first.click()
await asyncio.sleep(1)
async def handle_upload_error(self, page):
X_logger.info("微信公众号上传错误,正在重试...")
try:
# 重新查找文件上传元素并重试
upload_element = page.locator('input[type="file"]')
await upload_element.wait_for(state='visible', timeout=10000)
await upload_element.set_input_files(self.file_path)
X_logger.info("重试上传成功")
except Exception as e:
X_logger.error(f"重试上传失败: {e}")
async def upload(self, playwright: Playwright) -> None:
browser = await playwright.chromium.launch(headless=False, executable_path=self.local_executable_path)
context = await browser.new_context(storage_state=f"{self.account_file}")
context = await set_init_script(context)
page = await context.new_page()
# 访问微信公众号管理页面
await page.goto("https://mp.weixin.qq.com/")
X_logger.info(f'[+]正在上传到微信公众号-------{self.title}')
await page.wait_for_load_state('domcontentloaded', timeout=30000)
try:
# 等待页面加载完成
await page.wait_for_timeout(3000)
# 点击图文创作按钮,并处理可能的新页面
new_page = await self.click_image_text_creation(page, context)
# 如果有新页面,使用新页面进行后续操作
working_page = new_page if new_page else page
# 等待创作页面加载
await working_page.wait_for_timeout(3000)
# 上传图片或视频文件
await self.upload_media_file(working_page)
# 添加标题和内容
await self.add_title_content(working_page)
# 发布内容
await self.click_publish(working_page)
await self.set_schedule_time_wechat(working_page, self.publish_date)
except Exception as e:
X_logger.error(f"上传过程中出现错误: {e}")
# 处理上传错误
await self.handle_upload_error(page)
await context.storage_state(path=f"{self.account_file}") # save cookie
X_logger.info(' [-] 更新cookie成功')
await asyncio.sleep(2) # close delay for look the video status
# close all
await context.close()
await browser.close()
async def click_image_text_creation(self, page, context):
"""点击图文创作按钮,跳转到创作页面,返回新页面对象(如果有)"""
try:
# 根据用户提供的HTML结构查找图文创作按钮
image_text_button = page.locator('div.new-creation__menu-item:has(div.new-creation__menu-title:has-text("图文"))')
await image_text_button.wait_for(state='visible', timeout=15000)
# 尝试检测新页面
try:
# 监听新页面创建事件,设置短超时
async with context.expect_page(timeout=3000) as new_page_info:
# 点击图文创作按钮
await image_text_button.click()
X_logger.info("成功点击图文创作按钮")
# 获取新页面
new_page = await new_page_info.value
await new_page.wait_for_load_state('domcontentloaded', timeout=30000)
X_logger.info("检测到新页面,将在新页面中进行后续操作")
return new_page
except:
# 如果没有检测到新页面,等待当前页面跳转
X_logger.info("未检测到新页面,等待当前页面跳转")
await page.wait_for_timeout(2000)
try:
await page.wait_for_load_state('domcontentloaded', timeout=15000)
except:
pass # 忽略超时错误
X_logger.info("在当前页面中跳转到创作页面")
return None
except Exception as e:
X_logger.error(f"点击图文创作按钮失败: {e}")
# 备用方案使用SVG图标定位
try:
svg_button = page.locator('div.new-creation__menu-item:has(svg)')
# 尝试检测新页面
try:
# 监听新页面创建事件,设置短超时
async with context.expect_page(timeout=3000) as new_page_info:
await svg_button.first.click()
X_logger.info("使用备用方案成功点击图文创作按钮")
# 获取新页面
new_page = await new_page_info.value
await new_page.wait_for_load_state('domcontentloaded', timeout=30000)
X_logger.info("检测到新页面,将在新页面中进行后续操作")
return new_page
except:
# 如果没有检测到新页面,等待当前页面跳转
X_logger.info("备用方案:未检测到新页面,等待当前页面跳转")
await page.wait_for_timeout(2000)
try:
await page.wait_for_load_state('domcontentloaded', timeout=15000)
except:
pass # 忽略超时错误
X_logger.info("在当前页面中跳转到创作页面")
return None
except Exception as backup_e:
X_logger.error(f"备用方案也失败: {backup_e}")
raise
async def add_title_content(self, page):
"""添加文章标题和内容"""
try:
# 等待编辑器加载
await page.wait_for_timeout(2000)
# 查找标题输入框 - 使用微信公众号的实际选择器
title_selectors = [
'textarea#title[placeholder*="请在这里输入标题"]', # 精确匹配您提供的标题框
'textarea#title', # 备用选择器
'input[placeholder*="标题"], input[placeholder*="请输入标题"]', # 通用备用
'textarea[placeholder*="标题"]' # textarea类型的标题框
]
title_added = False
for selector in title_selectors:
try:
title_input = page.locator(selector)
if await title_input.count() > 0:
await title_input.first.click()
await title_input.first.fill(self.title)
X_logger.info(f"成功添加标题: {self.title} (使用选择器: {selector})")
title_added = True
break
except Exception as e:
X_logger.debug(f"标题选择器 {selector} 失败: {e}")
continue
if not title_added:
X_logger.warning("未找到标题输入框")
# 查找内容编辑区域 - 使用微信公众号的ProseMirror编辑器
content_selectors = [
'div.ProseMirror[contenteditable="true"]', # 精确匹配您提供的内容编辑器
'div[contenteditable="true"][translate="no"]', # 更具体的选择器
'div[contenteditable="true"]', # 通用备用
'textarea[placeholder*="内容"]' # textarea类型的内容框
]
content_added = False
for selector in content_selectors:
try:
content_editor = page.locator(selector)
if await content_editor.count() > 0:
await content_editor.first.click()
# 清除现有内容(包括占位符)
await page.keyboard.press("Control+A")
await page.keyboard.press("Delete")
# 构建内容,包含标签
content = self.title + "\n\n"
if self.tags:
content += "\n".join([f"#{tag}" for tag in self.tags])
# 逐字符输入内容,确保在富文本编辑器中正确显示
await page.keyboard.type(content)
X_logger.info(f"成功添加文章内容和标签 (使用选择器: {selector})")
content_added = True
break
except Exception as e:
X_logger.debug(f"内容选择器 {selector} 失败: {e}")
continue
if not content_added:
X_logger.warning("未找到内容编辑区域")
except Exception as e:
X_logger.error(f"添加标题和内容时出错: {e}")
async def upload_media_file(self, page):
"""上传图片或视频文件"""
try:
# 查找文件上传按钮或区域优先使用微信公众号的透明label上传按钮
upload_buttons = [
'label[style*="opacity: 0"][style*="width: 100%"][style*="height: 100%"][style*="cursor: pointer"]', # 精确匹配您提供的样式
'label[style*="opacity: 0"][style*="cursor: pointer"]', # 微信公众号透明上传按钮
'label[style*="opacity: 0"]', # 更通用的透明label选择器
'input[type="file"]',
'button:has-text("插入图片")',
'button:has-text("上传图片")',
'div:has-text("点击上传")',
'.upload-btn'
]
uploaded = False
for selector in upload_buttons:
try:
upload_element = page.locator(selector)
if await upload_element.count() > 0:
X_logger.info(f"找到上传元素,使用选择器: {selector}")
if selector == 'input[type="file"]':
await upload_element.first.set_input_files(self.file_path)
else:
# 对于透明label和其他元素先悬停再点击触发文件选择器
element = upload_element.first
# 确保元素可见
await element.scroll_into_view_if_needed()
await page.wait_for_timeout(300)
# 获取元素的边界框
box = await element.bounding_box()
if box:
# 计算元素中心点
center_x = box['x'] + box['width'] / 2
center_y = box['y'] + box['height'] / 2
# 先移动鼠标到元素中心位置
await page.mouse.move(center_x, center_y)
X_logger.info(f"鼠标移动到坐标: ({center_x}, {center_y})")
# 稍等一下让透明按钮显示
await page.wait_for_timeout(800)
# 再次悬停确保状态
await element.hover()
await page.wait_for_timeout(300)
async with page.expect_file_chooser() as fc_info:
await element.click()
file_chooser = await fc_info.value
await file_chooser.set_files(self.file_path)
else:
# 如果无法获取边界框,使用普通悬停
await element.hover()
await page.wait_for_timeout(500)
async with page.expect_file_chooser() as fc_info:
await element.click()
file_chooser = await fc_info.value
await file_chooser.set_files(self.file_path)
X_logger.info("成功上传媒体文件")
uploaded = True
break
except Exception as e:
X_logger.debug(f"选择器 {selector} 失败: {e}")
continue
if not uploaded:
X_logger.warning("未找到文件上传元素,跳过文件上传")
except Exception as e:
X_logger.error(f"上传媒体文件时出错: {e}")
async def click_publish(self, page):
"""点击发布按钮发布文章"""
try:
# 等待页面稳定
await page.wait_for_timeout(2000)
# 查找微信公众号的发布按钮
publish_selectors = [
'button.mass_send',
'button:has-text("发表")',
'button:has-text("群发")',
'button:has-text("发布")',
'button[class*="publish"]',
'.btn-publish'
]
published = False
for selector in publish_selectors:
try:
publish_button = page.locator(selector)
if await publish_button.count() > 0:
await publish_button.first.click()
X_logger.info("成功点击发布按钮")
published = True
break
except Exception:
continue
if not published:
X_logger.warning("未找到发布按钮,可能需要手动发布")
# 等待发布完成
await page.wait_for_timeout(3000)
X_logger.info("文章发布完成")
except Exception as e:
X_logger.error(f"点击发布按钮时出错: {e}")
async def main():
"""用于测试微信公众号上传功能的主函数"""
from pathlib import Path
# 配置文件路径
account_file = Path(__file__).parent.parent.parent / "cookies" / "wechat_uploader" / "account.json"
account_file.parent.mkdir(exist_ok=True, parents=True)
# 首先确保cookie有效
if not await wechat_setup(str(account_file), handle=True):
X_logger.error("Cookie设置失败")
return
# 创建测试图片对象(微信公众号可上传图片或视频)
image_path = Path(__file__).parent.parent.parent / "videos" / "demo.png"
if not image_path.exists():
X_logger.error(f"测试图片文件不存在: {image_path}")
return
publish_datetimes = generate_schedule_time_next_day(1, 1, daily_times=[16])
wechat_video = WechatVideo(
title="测试微信公众号图文发布",
file_path=str(image_path),
tags=["测试", "微信公众号", "图文"],
publish_date=publish_datetimes[0],
account_file=str(account_file)
)
# 开始上传
async with async_playwright() as playwright:
await wechat_video.upload(playwright)
if __name__ == '__main__':
asyncio.run(main())