from pathlib import Path import sys import os # 添加项目根目录到 Python 路径 current_dir = Path(__file__).parent.resolve() project_root = current_dir.parent.parent sys.path.append(str(project_root)) from conf import LOCAL_CHROME_PATH, BASE_DIR import re from datetime import datetime from playwright.async_api import Playwright, async_playwright import os import asyncio #from uploader.tk_uploader.tk_config import Tk_Locator from utils.base_social_media import set_init_script from utils.files_times import get_absolute_path from utils.log import X_logger from utils.anti_detection import create_stealth_browser, create_stealth_context, setup_stealth_page from utils.human_typing_wrapper import create_human_typer async def cookie_auth(account_file): async with async_playwright() as playwright: browser = await playwright.chromium.launch(headless=True) context = await browser.new_context(storage_state=account_file) context = await set_init_script(context) # 创建一个新的页面 page = await context.new_page() # 访问指定的 URL await page.goto("https://www.goofish.com/search") #await page.wait_for_load_state('networkidle') await page.wait_for_load_state('domcontentloaded', timeout=30000) try: # 选择所有的 select 元素 select_elements = await page.query_selector_all('select') for element in select_elements: class_name = await element.get_attribute('class') # 使用正则表达式匹配特定模式的 class 名称 if re.match(r'ins-.*-SelectFormContainer.*', class_name): X_logger.error("[+] cookie expired") return False X_logger.success("[+] cookie valid") return True except: X_logger.success("[+] cookie valid") return True async def xianyu_setup(account_file, handle=False): account_file = get_absolute_path(account_file, "https://www.goofish.com/search") if not os.path.exists(account_file) or not await cookie_auth(account_file): if not handle: return False X_logger.info('[+] cookie file is not existed or expired. Now open the browser auto. Please login with your way(gmail phone, whatever, the cookie file will generated after login') await get_xianyu_cookie(account_file) return True async def get_xianyu_cookie(account_file): async with async_playwright() as playwright: options = { 'args': [ '--lang en-GB', ], 'headless': False, # Set headless option here } # Make sure to run headed. browser = await playwright.chromium.launch(**options) # Setup context however you like. context = await browser.new_context() # Pass any options context = await set_init_script(context) # Pause the page, and start recording manually. page = await context.new_page() await page.goto("https://www.goofish.com/search") await page.pause() # 点击调试器的继续,保存cookie await context.storage_state(path=account_file) # if __name__ == '__main__': # account_file = Path(BASE_DIR / "cookies" / "xianyu_uploader" / "account.json") # account_file.parent.mkdir(exist_ok=True) # cookie_setup = asyncio.run(xianyu_setup(str(account_file), handle=True)) class XianyuVideo(object): def __init__(self, title, file_path, tags, publish_date, account_file, thumbnail_path=None, price=None, original_price=None,headless=True): self.title = title self.file_path = file_path self.tags = tags self.publish_date = publish_date self.thumbnail_path = thumbnail_path self.account_file = account_file self.price = price # 实际价格 self.original_price = original_price # 原价 self.local_executable_path = LOCAL_CHROME_PATH self.locator_base = None self.headless = headless # click title to remove the focus. # await self.locator_base.locator("h1:has-text('Upload video')").click() async def handle_upload_error(self, page): X_logger.info("闲鱼上传错误,正在重试...") try: # 重新查找"添加首图"元素并重试 upload_element = page.locator('div.upload-item--VvK_FTdU[style*="cursor: pointer;"]') await upload_element.wait_for(state='visible', timeout=10000) async with page.expect_file_chooser() as fc_info: await upload_element.click() file_chooser = await fc_info.value await file_chooser.set_files(self.file_path) X_logger.info("重试上传成功") except Exception as e: X_logger.error(f"重试上传失败: {e}") async def upload(self, playwright: Playwright) -> None: # 使用统一的反检测工具 browser = await create_stealth_browser( playwright=playwright, headless=self.headless, executable_path=self.local_executable_path ) context = await create_stealth_context( browser=browser, account_file=self.account_file, headless=self.headless ) page = await setup_stealth_page(context, "https://www.goofish.com/publish?spm=a21ybx.search.sidebar") # 创建人类化输入包装器 human_typer = create_human_typer(page) X_logger.info(f'[+]Uploading to 闲鱼-------{self.title}') await page.wait_for_load_state('domcontentloaded', timeout=30000) try: # 等待页面加载完成 await page.wait_for_timeout(2000) # 点击"添加首图"元素 #可以改成输入多张图片列表,for循环上传 #
#
#style="object-fit: fill; width: 12px; height: 12px;"> # 添加细节图
upload_element = page.locator('div.upload-item--VvK_FTdU[style*="cursor: pointer;"]') await upload_element.wait_for(state='visible', timeout=10000) # 选择文件 async with page.expect_file_chooser() as fc_info: await upload_element.click() file_chooser = await fc_info.value await file_chooser.set_files(self.file_path) X_logger.info("成功上传首图文件") # 填写价格信息 await self.add_price(page, human_typer) # 添加标题和标签 await self.add_title_tags(page, human_typer) # 发布内容 await self.click_publish(page) except Exception as e: X_logger.error(f"上传过程中出现错误: {e}") # 处理上传错误 await self.handle_upload_error(page) await context.storage_state(path=f"{self.account_file}") # save cookie X_logger.info(' [-] 更新cookie成功!') await asyncio.sleep(2) # close delay for look the video status # close all await context.close() await browser.close() async def add_title_tags(self, page, human_typer): try: # 等待闲鱼页面加载完成 await page.wait_for_timeout(3000) # 等待页面加载 # 查找闲鱼的商品描述输入框(避免价格输入框) description_selector = 'div.editor--MtHPS94K[data-placeholder*="描述一下宝贝的品牌型号"][contenteditable="true"]' try: text_area = page.locator(description_selector) await text_area.wait_for(state='visible', timeout=10000) # 构建完整内容(标题 + 标签) content = self.title + "\n\n" tags_text = " ".join([f"#{tag}" for tag in self.tags]) + " " full_content = content + tags_text # 使用人类化输入 success = await human_typer.type_text_human( description_selector, full_content, clear_first=True ) if not success: X_logger.warning("商品描述人类化输入失败,使用传统方式") # 点击描述输入框 await text_area.click() await page.wait_for_timeout(1000) # 清空现有内容 await page.keyboard.press("Control+A") await page.keyboard.press("Delete") # 输入标题和描述 await page.keyboard.insert_text(self.title) await page.keyboard.press("Enter") await page.keyboard.press("Enter") # 添加标签 for index, tag in enumerate(self.tags, start=1): X_logger.info(f"添加第 {index} 个标签: {tag}") await page.keyboard.insert_text(f"#{tag} ") await page.wait_for_timeout(500) X_logger.info("成功添加商品描述和标签") except Exception as e: X_logger.error(f"查找闲鱼描述输入框失败: {e}") X_logger.warning("未找到商品描述输入框,跳过添加标题和标签") # 调试用:暂停10秒,让你有时间观察页面上的标签是否正确输入 await page.wait_for_timeout(10000) X_logger.info("观察时间结束,中断程序") # 强制中断(后续代码不执行) raise SystemExit("调试中断:标签输入流程完成") except Exception as e: X_logger.error(f"添加标题和标签时出错: {e}") async def add_price(self, page, human_typer): """填写商品价格信息""" try: # 等待页面加载完成 await page.wait_for_timeout(2000) # 查找并填写实际价格(第一个价格输入框) if self.price: try: # 使用更精确的选择器来区分实际价格和原价 # 实际价格输入框通常在前面,通过索引区分 actual_price_inputs = page.locator('input.ant-input.css-1ezqs0p[placeholder="0.00"][min="0"]') actual_price_input = actual_price_inputs.first # 第一个是实际价格 await actual_price_input.wait_for(state='visible', timeout=10000) # 使用人类化输入填写实际价格 success = await human_typer.type_text_human( 'input.ant-input.css-1ezqs0p[placeholder="0.00"][min="0"]', str(self.price), clear_first=True ) if not success: X_logger.warning("实际价格人类化输入失败,使用传统方式") await actual_price_input.click() # 清空现有内容 await page.keyboard.press("Control+A") await page.keyboard.press("Delete") # 输入实际价格 await page.keyboard.insert_text(str(self.price)) X_logger.info(f"成功填写实际价格: {self.price}") except Exception as e: X_logger.error(f"填写实际价格失败: {e}") # 查找并填写原价(第二个价格输入框) if self.original_price: try: # 等待一下,确保页面元素稳定 await page.wait_for_timeout(1000) # 获取所有价格输入框,第二个是原价 original_price_inputs = page.locator('input.ant-input.css-1ezqs0p[placeholder="0.00"][min="0"]') original_price_input = original_price_inputs.nth(1) # 第二个是原价 await original_price_input.wait_for(state='visible', timeout=10000) await original_price_input.click() # 清空现有内容 await page.keyboard.press("Control+A") await page.keyboard.press("Delete") # 输入原价 await page.keyboard.insert_text(str(self.original_price)) X_logger.info(f"成功填写原价: {self.original_price}") except Exception as e: X_logger.error(f"填写原价失败: {e}") if not self.price and not self.original_price: X_logger.warning("未设置价格信息,跳过价格填写") except Exception as e: X_logger.error(f"填写价格时出错: {e}") async def pobulish_setting(self, page): """发货设置""" try: await page.wait_for_timeout(2000) #["包邮","按距离计费","一口价","无需邮寄"] #if 一口价,则填写价格信息 #价格: #是否自提 #location except Exception as e: X_logger.error(f"发货设置时出错: {e}") async def classification(self, page): """分类""" try: await page.wait_for_timeout(2000) #分类:其他闲置 except Exception as e: X_logger.error(f"分类时出错: {e}") async def click_publish(self, page): """点击发布按钮发布商品""" try: # 等待页面稳定 await page.wait_for_timeout(2000) # 使用具体的闲鱼发布按钮选择器 publish_button = page.locator('button.publish-button--KBpTVopQ:has-text("发布")') await publish_button.wait_for(state='visible', timeout=10000) # 点击发布按钮 await publish_button.click() X_logger.info("成功点击发布按钮") # 等待发布完成 await page.wait_for_timeout(3000) X_logger.info("商品发布完成") except Exception as e: X_logger.error(f"点击发布按钮时出错: {e}") # 备用方案:使用通用的发布按钮文本选择器 try: backup_publish_button = page.locator('button:has-text("发布")') await backup_publish_button.wait_for(state='visible', timeout=5000) await backup_publish_button.click() X_logger.info("使用备用选择器成功点击发布按钮") except Exception as backup_e: X_logger.error(f"备用发布方案也失败: {backup_e}") async def main(): """用于测试闲鱼上传功能的主函数""" from pathlib import Path # 配置文件路径 account_file = Path(__file__).parent.parent.parent / "cookies" / "xianyu_uploader" / "account.json" account_file.parent.mkdir(exist_ok=True, parents=True) # 首先确保cookie有效 if not await xianyu_setup(str(account_file), handle=True): X_logger.error("Cookie设置失败") return # 创建测试图片对象(闲鱼主要上传商品图片) image_path = Path(__file__).parent.parent.parent / "videos" / "demo.png" if not image_path.exists(): X_logger.error(f"测试图片文件不存在: {image_path}") return xianyu_video = XianyuVideo( title="测试闲鱼商品上传", file_path=str(image_path), tags=["测试", "闲鱼", "商品"], publish_date=datetime.now(), account_file=str(account_file), price=99, original_price=199, headless=False ) # 开始上传 async with async_playwright() as playwright: await xianyu_video.upload(playwright) if __name__ == '__main__': asyncio.run(main())