401 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from pathlib import Path
import sys
import os
# 添加项目根目录到 Python 路径
current_dir = Path(__file__).parent.resolve()
project_root = current_dir.parent.parent
sys.path.append(str(project_root))
from conf import LOCAL_CHROME_PATH, BASE_DIR
import re
from datetime import datetime
from playwright.async_api import Playwright, async_playwright
import os
import asyncio
#from uploader.tk_uploader.tk_config import Tk_Locator
from utils.base_social_media import set_init_script
from utils.files_times import get_absolute_path
from utils.log import X_logger
from utils.anti_detection import create_stealth_browser, create_stealth_context, setup_stealth_page
from utils.human_typing_wrapper import create_human_typer
async def cookie_auth(account_file):
async with async_playwright() as playwright:
browser = await playwright.chromium.launch(headless=True)
context = await browser.new_context(storage_state=account_file)
context = await set_init_script(context)
# 创建一个新的页面
page = await context.new_page()
# 访问指定的 URL
await page.goto("https://www.goofish.com/search")
#await page.wait_for_load_state('networkidle')
await page.wait_for_load_state('domcontentloaded', timeout=30000)
try:
# 选择所有的 select 元素
select_elements = await page.query_selector_all('select')
for element in select_elements:
class_name = await element.get_attribute('class')
# 使用正则表达式匹配特定模式的 class 名称
if re.match(r'ins-.*-SelectFormContainer.*', class_name):
X_logger.error("[+] cookie expired")
return False
X_logger.success("[+] cookie valid")
return True
except:
X_logger.success("[+] cookie valid")
return True
async def xianyu_setup(account_file, handle=False):
account_file = get_absolute_path(account_file, "https://www.goofish.com/search")
if not os.path.exists(account_file) or not await cookie_auth(account_file):
if not handle:
return False
X_logger.info('[+] cookie file is not existed or expired. Now open the browser auto. Please login with your way(gmail phone, whatever, the cookie file will generated after login')
await get_xianyu_cookie(account_file)
return True
async def get_xianyu_cookie(account_file):
async with async_playwright() as playwright:
options = {
'args': [
'--lang en-GB',
],
'headless': False, # Set headless option here
}
# Make sure to run headed.
browser = await playwright.chromium.launch(**options)
# Setup context however you like.
context = await browser.new_context() # Pass any options
context = await set_init_script(context)
# Pause the page, and start recording manually.
page = await context.new_page()
await page.goto("https://www.goofish.com/search")
await page.pause()
# 点击调试器的继续保存cookie
await context.storage_state(path=account_file)
# if __name__ == '__main__':
# account_file = Path(BASE_DIR / "cookies" / "xianyu_uploader" / "account.json")
# account_file.parent.mkdir(exist_ok=True)
# cookie_setup = asyncio.run(xianyu_setup(str(account_file), handle=True))
class XianyuVideo(object):
def __init__(self, title, file_path, tags, publish_date, account_file, thumbnail_path=None, price=None, original_price=None,headless=True):
self.title = title
self.file_path = file_path
self.tags = tags
self.publish_date = publish_date
self.thumbnail_path = thumbnail_path
self.account_file = account_file
self.price = price # 实际价格
self.original_price = original_price # 原价
self.local_executable_path = LOCAL_CHROME_PATH
self.locator_base = None
self.headless = headless
# click title to remove the focus.
# await self.locator_base.locator("h1:has-text('Upload video')").click()
async def handle_upload_error(self, page):
X_logger.info("闲鱼上传错误,正在重试...")
try:
# 重新查找"添加首图"元素并重试
upload_element = page.locator('div.upload-item--VvK_FTdU[style*="cursor: pointer;"]')
await upload_element.wait_for(state='visible', timeout=10000)
async with page.expect_file_chooser() as fc_info:
await upload_element.click()
file_chooser = await fc_info.value
await file_chooser.set_files(self.file_path)
X_logger.info("重试上传成功")
except Exception as e:
X_logger.error(f"重试上传失败: {e}")
async def upload(self, playwright: Playwright) -> None:
# 使用统一的反检测工具
browser = await create_stealth_browser(
playwright=playwright,
headless=self.headless,
executable_path=self.local_executable_path
)
context = await create_stealth_context(
browser=browser,
account_file=self.account_file,
headless=self.headless
)
page = await setup_stealth_page(context, "https://www.goofish.com/publish?spm=a21ybx.search.sidebar")
# 创建人类化输入包装器
human_typer = create_human_typer(page)
X_logger.info(f'[+]Uploading to 闲鱼-------{self.title}')
await page.wait_for_load_state('domcontentloaded', timeout=30000)
try:
# 等待页面加载完成
await page.wait_for_timeout(2000)
# 点击"添加首图"元素
#可以改成输入多张图片列表for循环上传
#<div class="upload-item--VvK_FTdU"
# style="cursor: pointer;"
# data-spm-anchor-id="a21ybx.publish.0.i2.885c7310TfTQMu">
# <div class="upload-content--MU7R8CMf">
#style="object-fit: fill; width: 12px; height: 12px;">
# <span data-spm-anchor-id="a21ybx.publish.0.i8.885c7310TfTQMu">添加细节图</span></div></div>
upload_element = page.locator('div.upload-item--VvK_FTdU[style*="cursor: pointer;"]')
await upload_element.wait_for(state='visible', timeout=10000)
# 选择文件
async with page.expect_file_chooser() as fc_info:
await upload_element.click()
file_chooser = await fc_info.value
await file_chooser.set_files(self.file_path)
X_logger.info("成功上传首图文件")
# 填写价格信息
await self.add_price(page, human_typer)
# 添加标题和标签
await self.add_title_tags(page, human_typer)
# 发布内容
await self.click_publish(page)
except Exception as e:
X_logger.error(f"上传过程中出现错误: {e}")
# 处理上传错误
await self.handle_upload_error(page)
await context.storage_state(path=f"{self.account_file}") # save cookie
X_logger.info(' [-] 更新cookie成功')
await asyncio.sleep(2) # close delay for look the video status
# close all
await context.close()
await browser.close()
async def add_title_tags(self, page, human_typer):
try:
# 等待闲鱼页面加载完成
await page.wait_for_timeout(3000) # 等待页面加载
# 查找闲鱼的商品描述输入框(避免价格输入框)
description_selector = 'div.editor--MtHPS94K[data-placeholder*="描述一下宝贝的品牌型号"][contenteditable="true"]'
try:
text_area = page.locator(description_selector)
await text_area.wait_for(state='visible', timeout=10000)
# 构建完整内容(标题 + 标签)
content = self.title + "\n\n"
tags_text = " ".join([f"#{tag}" for tag in self.tags]) + " "
full_content = content + tags_text
# 使用人类化输入
success = await human_typer.type_text_human(
description_selector,
full_content,
clear_first=True
)
if not success:
X_logger.warning("商品描述人类化输入失败,使用传统方式")
# 点击描述输入框
await text_area.click()
await page.wait_for_timeout(1000)
# 清空现有内容
await page.keyboard.press("Control+A")
await page.keyboard.press("Delete")
# 输入标题和描述
await page.keyboard.insert_text(self.title)
await page.keyboard.press("Enter")
await page.keyboard.press("Enter")
# 添加标签
for index, tag in enumerate(self.tags, start=1):
X_logger.info(f"添加第 {index} 个标签: {tag}")
await page.keyboard.insert_text(f"#{tag} ")
await page.wait_for_timeout(500)
X_logger.info("成功添加商品描述和标签")
except Exception as e:
X_logger.error(f"查找闲鱼描述输入框失败: {e}")
X_logger.warning("未找到商品描述输入框,跳过添加标题和标签")
# 调试用暂停10秒让你有时间观察页面上的标签是否正确输入
await page.wait_for_timeout(10000)
X_logger.info("观察时间结束,中断程序")
# 强制中断(后续代码不执行)
raise SystemExit("调试中断:标签输入流程完成")
except Exception as e:
X_logger.error(f"添加标题和标签时出错: {e}")
async def add_price(self, page, human_typer):
"""填写商品价格信息"""
try:
# 等待页面加载完成
await page.wait_for_timeout(2000)
# 查找并填写实际价格(第一个价格输入框)
if self.price:
try:
# 使用更精确的选择器来区分实际价格和原价
# 实际价格输入框通常在前面,通过索引区分
actual_price_inputs = page.locator('input.ant-input.css-1ezqs0p[placeholder="0.00"][min="0"]')
actual_price_input = actual_price_inputs.first # 第一个是实际价格
await actual_price_input.wait_for(state='visible', timeout=10000)
# 使用人类化输入填写实际价格
success = await human_typer.type_text_human(
'input.ant-input.css-1ezqs0p[placeholder="0.00"][min="0"]',
str(self.price),
clear_first=True
)
if not success:
X_logger.warning("实际价格人类化输入失败,使用传统方式")
await actual_price_input.click()
# 清空现有内容
await page.keyboard.press("Control+A")
await page.keyboard.press("Delete")
# 输入实际价格
await page.keyboard.insert_text(str(self.price))
X_logger.info(f"成功填写实际价格: {self.price}")
except Exception as e:
X_logger.error(f"填写实际价格失败: {e}")
# 查找并填写原价(第二个价格输入框)
if self.original_price:
try:
# 等待一下,确保页面元素稳定
await page.wait_for_timeout(1000)
# 获取所有价格输入框,第二个是原价
original_price_inputs = page.locator('input.ant-input.css-1ezqs0p[placeholder="0.00"][min="0"]')
original_price_input = original_price_inputs.nth(1) # 第二个是原价
await original_price_input.wait_for(state='visible', timeout=10000)
await original_price_input.click()
# 清空现有内容
await page.keyboard.press("Control+A")
await page.keyboard.press("Delete")
# 输入原价
await page.keyboard.insert_text(str(self.original_price))
X_logger.info(f"成功填写原价: {self.original_price}")
except Exception as e:
X_logger.error(f"填写原价失败: {e}")
if not self.price and not self.original_price:
X_logger.warning("未设置价格信息,跳过价格填写")
except Exception as e:
X_logger.error(f"填写价格时出错: {e}")
async def pobulish_setting(self, page):
"""发货设置"""
try:
await page.wait_for_timeout(2000)
#["包邮""按距离计费""一口价""无需邮寄"]
#if 一口价,则填写价格信息
#价格:<input placeholder="0.00" min="0" class="ant-input css-1ezqs0p" type="text" value="" data-spm-anchor-id="a21ybx.publish.0.i35.885c7310TfTQMu">
#是否自提
#location
except Exception as e:
X_logger.error(f"发货设置时出错: {e}")
async def classification(self, page):
"""分类"""
try:
await page.wait_for_timeout(2000)
#分类:<span class="ant-select-selection-item" title="其他闲置" data-spm-anchor-id="a21ybx.publish.0.i3.885c7310TfTQMu">其他闲置</span>
except Exception as e:
X_logger.error(f"分类时出错: {e}")
async def click_publish(self, page):
"""点击发布按钮发布商品"""
try:
# 等待页面稳定
await page.wait_for_timeout(2000)
# 使用具体的闲鱼发布按钮选择器
publish_button = page.locator('button.publish-button--KBpTVopQ:has-text("发布")')
await publish_button.wait_for(state='visible', timeout=10000)
# 点击发布按钮
await publish_button.click()
X_logger.info("成功点击发布按钮")
# 等待发布完成
await page.wait_for_timeout(3000)
X_logger.info("商品发布完成")
except Exception as e:
X_logger.error(f"点击发布按钮时出错: {e}")
# 备用方案:使用通用的发布按钮文本选择器
try:
backup_publish_button = page.locator('button:has-text("发布")')
await backup_publish_button.wait_for(state='visible', timeout=5000)
await backup_publish_button.click()
X_logger.info("使用备用选择器成功点击发布按钮")
except Exception as backup_e:
X_logger.error(f"备用发布方案也失败: {backup_e}")
async def main():
"""用于测试闲鱼上传功能的主函数"""
from pathlib import Path
# 配置文件路径
account_file = Path(__file__).parent.parent.parent / "cookies" / "xianyu_uploader" / "account.json"
account_file.parent.mkdir(exist_ok=True, parents=True)
# 首先确保cookie有效
if not await xianyu_setup(str(account_file), handle=True):
X_logger.error("Cookie设置失败")
return
# 创建测试图片对象(闲鱼主要上传商品图片)
image_path = Path(__file__).parent.parent.parent / "videos" / "demo.png"
if not image_path.exists():
X_logger.error(f"测试图片文件不存在: {image_path}")
return
xianyu_video = XianyuVideo(
title="测试闲鱼商品上传",
file_path=str(image_path),
tags=["测试", "闲鱼", "商品"],
publish_date=datetime.now(),
account_file=str(account_file),
price=99,
original_price=199,
headless=False
)
# 开始上传
async with async_playwright() as playwright:
await xianyu_video.upload(playwright)
if __name__ == '__main__':
asyncio.run(main())