401 lines
17 KiB
Python
401 lines
17 KiB
Python
from pathlib import Path
|
||
import sys
|
||
import os
|
||
|
||
# 添加项目根目录到 Python 路径
|
||
current_dir = Path(__file__).parent.resolve()
|
||
project_root = current_dir.parent.parent
|
||
sys.path.append(str(project_root))
|
||
|
||
from conf import LOCAL_CHROME_PATH, BASE_DIR
|
||
import re
|
||
from datetime import datetime
|
||
|
||
from playwright.async_api import Playwright, async_playwright
|
||
import os
|
||
import asyncio
|
||
#from uploader.tk_uploader.tk_config import Tk_Locator
|
||
from utils.base_social_media import set_init_script
|
||
from utils.files_times import get_absolute_path
|
||
from utils.log import X_logger
|
||
from utils.anti_detection import create_stealth_browser, create_stealth_context, setup_stealth_page
|
||
from utils.human_typing_wrapper import create_human_typer
|
||
|
||
|
||
async def cookie_auth(account_file):
|
||
async with async_playwright() as playwright:
|
||
browser = await playwright.chromium.launch(headless=True)
|
||
context = await browser.new_context(storage_state=account_file)
|
||
context = await set_init_script(context)
|
||
# 创建一个新的页面
|
||
page = await context.new_page()
|
||
# 访问指定的 URL
|
||
await page.goto("https://www.goofish.com/search")
|
||
#await page.wait_for_load_state('networkidle')
|
||
await page.wait_for_load_state('domcontentloaded', timeout=30000)
|
||
try:
|
||
# 选择所有的 select 元素
|
||
select_elements = await page.query_selector_all('select')
|
||
for element in select_elements:
|
||
class_name = await element.get_attribute('class')
|
||
# 使用正则表达式匹配特定模式的 class 名称
|
||
if re.match(r'ins-.*-SelectFormContainer.*', class_name):
|
||
X_logger.error("[+] cookie expired")
|
||
return False
|
||
X_logger.success("[+] cookie valid")
|
||
return True
|
||
except:
|
||
X_logger.success("[+] cookie valid")
|
||
return True
|
||
|
||
|
||
async def xianyu_setup(account_file, handle=False):
|
||
account_file = get_absolute_path(account_file, "https://www.goofish.com/search")
|
||
if not os.path.exists(account_file) or not await cookie_auth(account_file):
|
||
if not handle:
|
||
return False
|
||
X_logger.info('[+] cookie file is not existed or expired. Now open the browser auto. Please login with your way(gmail phone, whatever, the cookie file will generated after login')
|
||
await get_xianyu_cookie(account_file)
|
||
return True
|
||
|
||
|
||
async def get_xianyu_cookie(account_file):
|
||
async with async_playwright() as playwright:
|
||
options = {
|
||
'args': [
|
||
'--lang en-GB',
|
||
],
|
||
'headless': False, # Set headless option here
|
||
}
|
||
# Make sure to run headed.
|
||
browser = await playwright.chromium.launch(**options)
|
||
# Setup context however you like.
|
||
context = await browser.new_context() # Pass any options
|
||
context = await set_init_script(context)
|
||
# Pause the page, and start recording manually.
|
||
page = await context.new_page()
|
||
await page.goto("https://www.goofish.com/search")
|
||
await page.pause()
|
||
# 点击调试器的继续,保存cookie
|
||
await context.storage_state(path=account_file)
|
||
|
||
# if __name__ == '__main__':
|
||
# account_file = Path(BASE_DIR / "cookies" / "xianyu_uploader" / "account.json")
|
||
# account_file.parent.mkdir(exist_ok=True)
|
||
# cookie_setup = asyncio.run(xianyu_setup(str(account_file), handle=True))
|
||
|
||
class XianyuVideo(object):
|
||
def __init__(self, title, file_path, tags, publish_date, account_file, thumbnail_path=None, price=None, original_price=None,headless=True):
|
||
self.title = title
|
||
self.file_path = file_path
|
||
self.tags = tags
|
||
self.publish_date = publish_date
|
||
self.thumbnail_path = thumbnail_path
|
||
self.account_file = account_file
|
||
self.price = price # 实际价格
|
||
self.original_price = original_price # 原价
|
||
self.local_executable_path = LOCAL_CHROME_PATH
|
||
self.locator_base = None
|
||
self.headless = headless
|
||
# click title to remove the focus.
|
||
# await self.locator_base.locator("h1:has-text('Upload video')").click()
|
||
|
||
async def handle_upload_error(self, page):
|
||
X_logger.info("闲鱼上传错误,正在重试...")
|
||
try:
|
||
# 重新查找"添加首图"元素并重试
|
||
upload_element = page.locator('div.upload-item--VvK_FTdU[style*="cursor: pointer;"]')
|
||
await upload_element.wait_for(state='visible', timeout=10000)
|
||
|
||
async with page.expect_file_chooser() as fc_info:
|
||
await upload_element.click()
|
||
file_chooser = await fc_info.value
|
||
await file_chooser.set_files(self.file_path)
|
||
X_logger.info("重试上传成功")
|
||
except Exception as e:
|
||
X_logger.error(f"重试上传失败: {e}")
|
||
|
||
async def upload(self, playwright: Playwright) -> None:
|
||
# 使用统一的反检测工具
|
||
browser = await create_stealth_browser(
|
||
playwright=playwright,
|
||
headless=self.headless,
|
||
executable_path=self.local_executable_path
|
||
)
|
||
|
||
context = await create_stealth_context(
|
||
browser=browser,
|
||
account_file=self.account_file,
|
||
headless=self.headless
|
||
)
|
||
|
||
page = await setup_stealth_page(context, "https://www.goofish.com/publish?spm=a21ybx.search.sidebar")
|
||
|
||
# 创建人类化输入包装器
|
||
human_typer = create_human_typer(page)
|
||
|
||
X_logger.info(f'[+]Uploading to 闲鱼-------{self.title}')
|
||
|
||
await page.wait_for_load_state('domcontentloaded', timeout=30000)
|
||
|
||
try:
|
||
# 等待页面加载完成
|
||
await page.wait_for_timeout(2000)
|
||
|
||
# 点击"添加首图"元素
|
||
#可以改成输入多张图片列表,for循环上传
|
||
#<div class="upload-item--VvK_FTdU"
|
||
# style="cursor: pointer;"
|
||
# data-spm-anchor-id="a21ybx.publish.0.i2.885c7310TfTQMu">
|
||
# <div class="upload-content--MU7R8CMf">
|
||
#style="object-fit: fill; width: 12px; height: 12px;">
|
||
# <span data-spm-anchor-id="a21ybx.publish.0.i8.885c7310TfTQMu">添加细节图</span></div></div>
|
||
upload_element = page.locator('div.upload-item--VvK_FTdU[style*="cursor: pointer;"]')
|
||
await upload_element.wait_for(state='visible', timeout=10000)
|
||
|
||
# 选择文件
|
||
async with page.expect_file_chooser() as fc_info:
|
||
await upload_element.click()
|
||
file_chooser = await fc_info.value
|
||
await file_chooser.set_files(self.file_path)
|
||
X_logger.info("成功上传首图文件")
|
||
|
||
# 填写价格信息
|
||
await self.add_price(page, human_typer)
|
||
|
||
# 添加标题和标签
|
||
await self.add_title_tags(page, human_typer)
|
||
|
||
# 发布内容
|
||
await self.click_publish(page)
|
||
|
||
except Exception as e:
|
||
X_logger.error(f"上传过程中出现错误: {e}")
|
||
# 处理上传错误
|
||
await self.handle_upload_error(page)
|
||
|
||
await context.storage_state(path=f"{self.account_file}") # save cookie
|
||
X_logger.info(' [-] 更新cookie成功!')
|
||
await asyncio.sleep(2) # close delay for look the video status
|
||
# close all
|
||
await context.close()
|
||
await browser.close()
|
||
|
||
async def add_title_tags(self, page, human_typer):
|
||
try:
|
||
# 等待闲鱼页面加载完成
|
||
await page.wait_for_timeout(3000) # 等待页面加载
|
||
|
||
# 查找闲鱼的商品描述输入框(避免价格输入框)
|
||
description_selector = 'div.editor--MtHPS94K[data-placeholder*="描述一下宝贝的品牌型号"][contenteditable="true"]'
|
||
|
||
try:
|
||
text_area = page.locator(description_selector)
|
||
await text_area.wait_for(state='visible', timeout=10000)
|
||
|
||
# 构建完整内容(标题 + 标签)
|
||
content = self.title + "\n\n"
|
||
tags_text = " ".join([f"#{tag}" for tag in self.tags]) + " "
|
||
full_content = content + tags_text
|
||
|
||
# 使用人类化输入
|
||
success = await human_typer.type_text_human(
|
||
description_selector,
|
||
full_content,
|
||
clear_first=True
|
||
)
|
||
|
||
if not success:
|
||
X_logger.warning("商品描述人类化输入失败,使用传统方式")
|
||
# 点击描述输入框
|
||
await text_area.click()
|
||
await page.wait_for_timeout(1000)
|
||
|
||
# 清空现有内容
|
||
await page.keyboard.press("Control+A")
|
||
await page.keyboard.press("Delete")
|
||
|
||
# 输入标题和描述
|
||
await page.keyboard.insert_text(self.title)
|
||
await page.keyboard.press("Enter")
|
||
await page.keyboard.press("Enter")
|
||
|
||
# 添加标签
|
||
for index, tag in enumerate(self.tags, start=1):
|
||
X_logger.info(f"添加第 {index} 个标签: {tag}")
|
||
await page.keyboard.insert_text(f"#{tag} ")
|
||
await page.wait_for_timeout(500)
|
||
|
||
X_logger.info("成功添加商品描述和标签")
|
||
|
||
except Exception as e:
|
||
X_logger.error(f"查找闲鱼描述输入框失败: {e}")
|
||
X_logger.warning("未找到商品描述输入框,跳过添加标题和标签")
|
||
# 调试用:暂停10秒,让你有时间观察页面上的标签是否正确输入
|
||
await page.wait_for_timeout(10000)
|
||
X_logger.info("观察时间结束,中断程序")
|
||
|
||
# 强制中断(后续代码不执行)
|
||
raise SystemExit("调试中断:标签输入流程完成")
|
||
except Exception as e:
|
||
X_logger.error(f"添加标题和标签时出错: {e}")
|
||
|
||
async def add_price(self, page, human_typer):
|
||
"""填写商品价格信息"""
|
||
try:
|
||
# 等待页面加载完成
|
||
await page.wait_for_timeout(2000)
|
||
|
||
# 查找并填写实际价格(第一个价格输入框)
|
||
if self.price:
|
||
try:
|
||
# 使用更精确的选择器来区分实际价格和原价
|
||
# 实际价格输入框通常在前面,通过索引区分
|
||
actual_price_inputs = page.locator('input.ant-input.css-1ezqs0p[placeholder="0.00"][min="0"]')
|
||
actual_price_input = actual_price_inputs.first # 第一个是实际价格
|
||
|
||
await actual_price_input.wait_for(state='visible', timeout=10000)
|
||
|
||
# 使用人类化输入填写实际价格
|
||
success = await human_typer.type_text_human(
|
||
'input.ant-input.css-1ezqs0p[placeholder="0.00"][min="0"]',
|
||
str(self.price),
|
||
clear_first=True
|
||
)
|
||
|
||
if not success:
|
||
X_logger.warning("实际价格人类化输入失败,使用传统方式")
|
||
await actual_price_input.click()
|
||
|
||
# 清空现有内容
|
||
await page.keyboard.press("Control+A")
|
||
await page.keyboard.press("Delete")
|
||
|
||
# 输入实际价格
|
||
await page.keyboard.insert_text(str(self.price))
|
||
|
||
X_logger.info(f"成功填写实际价格: {self.price}")
|
||
|
||
except Exception as e:
|
||
X_logger.error(f"填写实际价格失败: {e}")
|
||
|
||
# 查找并填写原价(第二个价格输入框)
|
||
if self.original_price:
|
||
try:
|
||
# 等待一下,确保页面元素稳定
|
||
await page.wait_for_timeout(1000)
|
||
|
||
# 获取所有价格输入框,第二个是原价
|
||
original_price_inputs = page.locator('input.ant-input.css-1ezqs0p[placeholder="0.00"][min="0"]')
|
||
original_price_input = original_price_inputs.nth(1) # 第二个是原价
|
||
|
||
await original_price_input.wait_for(state='visible', timeout=10000)
|
||
await original_price_input.click()
|
||
|
||
# 清空现有内容
|
||
await page.keyboard.press("Control+A")
|
||
await page.keyboard.press("Delete")
|
||
|
||
# 输入原价
|
||
await page.keyboard.insert_text(str(self.original_price))
|
||
X_logger.info(f"成功填写原价: {self.original_price}")
|
||
|
||
except Exception as e:
|
||
X_logger.error(f"填写原价失败: {e}")
|
||
|
||
if not self.price and not self.original_price:
|
||
X_logger.warning("未设置价格信息,跳过价格填写")
|
||
|
||
except Exception as e:
|
||
X_logger.error(f"填写价格时出错: {e}")
|
||
|
||
async def pobulish_setting(self, page):
|
||
"""发货设置"""
|
||
try:
|
||
await page.wait_for_timeout(2000)
|
||
#["包邮","按距离计费","一口价","无需邮寄"]
|
||
#if 一口价,则填写价格信息
|
||
#价格:<input placeholder="0.00" min="0" class="ant-input css-1ezqs0p" type="text" value="" data-spm-anchor-id="a21ybx.publish.0.i35.885c7310TfTQMu">
|
||
#是否自提
|
||
#location
|
||
except Exception as e:
|
||
X_logger.error(f"发货设置时出错: {e}")
|
||
|
||
|
||
async def classification(self, page):
|
||
"""分类"""
|
||
try:
|
||
await page.wait_for_timeout(2000)
|
||
#分类:<span class="ant-select-selection-item" title="其他闲置" data-spm-anchor-id="a21ybx.publish.0.i3.885c7310TfTQMu">其他闲置</span>
|
||
except Exception as e:
|
||
X_logger.error(f"分类时出错: {e}")
|
||
|
||
async def click_publish(self, page):
|
||
"""点击发布按钮发布商品"""
|
||
try:
|
||
# 等待页面稳定
|
||
await page.wait_for_timeout(2000)
|
||
|
||
# 使用具体的闲鱼发布按钮选择器
|
||
publish_button = page.locator('button.publish-button--KBpTVopQ:has-text("发布")')
|
||
await publish_button.wait_for(state='visible', timeout=10000)
|
||
|
||
# 点击发布按钮
|
||
await publish_button.click()
|
||
X_logger.info("成功点击发布按钮")
|
||
|
||
# 等待发布完成
|
||
await page.wait_for_timeout(3000)
|
||
X_logger.info("商品发布完成")
|
||
|
||
except Exception as e:
|
||
X_logger.error(f"点击发布按钮时出错: {e}")
|
||
# 备用方案:使用通用的发布按钮文本选择器
|
||
try:
|
||
backup_publish_button = page.locator('button:has-text("发布")')
|
||
await backup_publish_button.wait_for(state='visible', timeout=5000)
|
||
await backup_publish_button.click()
|
||
X_logger.info("使用备用选择器成功点击发布按钮")
|
||
except Exception as backup_e:
|
||
X_logger.error(f"备用发布方案也失败: {backup_e}")
|
||
|
||
|
||
async def main():
|
||
"""用于测试闲鱼上传功能的主函数"""
|
||
from pathlib import Path
|
||
|
||
# 配置文件路径
|
||
account_file = Path(__file__).parent.parent.parent / "cookies" / "xianyu_uploader" / "account.json"
|
||
account_file.parent.mkdir(exist_ok=True, parents=True)
|
||
|
||
# 首先确保cookie有效
|
||
if not await xianyu_setup(str(account_file), handle=True):
|
||
X_logger.error("Cookie设置失败")
|
||
return
|
||
|
||
# 创建测试图片对象(闲鱼主要上传商品图片)
|
||
image_path = Path(__file__).parent.parent.parent / "videos" / "demo.png"
|
||
|
||
if not image_path.exists():
|
||
X_logger.error(f"测试图片文件不存在: {image_path}")
|
||
return
|
||
|
||
xianyu_video = XianyuVideo(
|
||
title="测试闲鱼商品上传",
|
||
file_path=str(image_path),
|
||
tags=["测试", "闲鱼", "商品"],
|
||
publish_date=datetime.now(),
|
||
account_file=str(account_file),
|
||
price=99,
|
||
original_price=199,
|
||
headless=False
|
||
)
|
||
|
||
# 开始上传
|
||
async with async_playwright() as playwright:
|
||
await xianyu_video.upload(playwright)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
asyncio.run(main())
|