diff --git a/scripts/distribution/distribute_content.py b/scripts/distribution/distribute_content.py old mode 100644 new mode 100755 index 922607c..9ae6340 --- a/scripts/distribution/distribute_content.py +++ b/scripts/distribution/distribute_content.py @@ -83,6 +83,9 @@ def parse_arguments(): help='每封邮件发送之间的延迟时间(秒)') parser.add_argument('--zip-filename', type=str, default=None, help='指定ZIP压缩包的基本文件名(不含扩展名),如"文旅小红书带货笔记内容0512"') + # 新增强制性附件参数 + parser.add_argument('--force-attachments', type=str, default=None, + help='强制添加的附件文件路径,多个文件用逗号分隔') return parser.parse_args() @@ -225,6 +228,8 @@ def allocate_content_to_users(users_df, content_df, article_per_user): try: # 创建用户列表 users = [] + users_with_insufficient_content = [] # 新增:跟踪内容不足的用户 + for _, row in users_df.iterrows(): users.append({ 'id': row['id'], @@ -237,7 +242,7 @@ def allocate_content_to_users(users_df, content_df, article_per_user): content_list = content_df.to_dict('records') if not content_list: logger.warning("没有可用内容进行分配") - return [] + return [], [] # 随机打乱内容列表 random.shuffle(content_list) @@ -248,10 +253,22 @@ def allocate_content_to_users(users_df, content_df, article_per_user): for user in users: user_contents = [] + expected_count = article_per_user # 预期内容数量 + for _ in range(article_per_user): if content_index >= len(content_list): - content_index = 0 # 如果内容不够,循环使用 - logger.warning("内容不足,将循环使用现有内容") + # 不再循环使用内容,而是停止分配 + logger.warning("内容不足,只分配可用内容") + # 如果用户获得的内容少于预期,记录下来 + if len(user_contents) < expected_count: + users_with_insufficient_content.append({ + 'user_id': user['id'], + 'email': user['email'], + 'username': user['username'], + 'expected_content': expected_count, + 'actual_content': len(user_contents) + }) + break content = content_list[content_index] user_contents.append(content) @@ -259,15 +276,29 @@ def allocate_content_to_users(users_df, content_df, article_per_user): content_index += 1 user['contents'] = user_contents + + # 检查是否获得了预期数量的内容 + if len(user_contents) < expected_count: + users_with_insufficient_content.append({ + 'user_id': user['id'], + 'email': user['email'], + 'username': user['username'], + 'expected_content': expected_count, + 'actual_content': len(user_contents) + }) logger.info(f"已为 {len(users)} 个用户分配 {len(content_allocated)} 条内容") unique_content_count = len(set([c['EntryID'] for c in content_allocated])) logger.info(f"分配的唯一内容条数: {unique_content_count}") - return users + if users_with_insufficient_content: + logger.warning(f"有 {len(users_with_insufficient_content)} 个用户获得的内容少于预期数量") + + # 返回用户列表和内容不足的用户列表 + return users, users_with_insufficient_content except Exception as e: logger.error(f"分配内容失败: {e}") - return [] + return [], [] def find_additional_images(file_path): """查找与文章相关的所有额外图片""" @@ -296,7 +327,7 @@ def find_additional_images(file_path): logger.info(f"找到 {len(image_files)} 张额外图片: {additional_images_dir}") return image_files -def prepare_distribution_csv(users_with_content, output_dir): +def prepare_distribution_csv(users_with_content, output_dir, users_with_insufficient_content=None): """准备分发CSV文件""" try: os.makedirs(output_dir, exist_ok=True) @@ -331,6 +362,14 @@ def prepare_distribution_csv(users_with_content, output_dir): df.to_csv(distribution_csv, index=False) logger.info(f"分发CSV已保存到: {distribution_csv}") + + # 如果有内容不足的用户,导出到CSV + if users_with_insufficient_content: + insufficient_df = pd.DataFrame(users_with_insufficient_content) + insufficient_csv = os.path.join(output_dir, f"insufficient_content_users_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv") + insufficient_df.to_csv(insufficient_csv, index=False) + logger.info(f"内容不足的用户列表已保存到: {insufficient_csv}") + return distribution_csv except Exception as e: logger.error(f"准备分发CSV失败: {e}") @@ -399,7 +438,7 @@ def send_single_email(from_addr, password, to_addr, subject, content, attachment return False def send_emails(distribution_csv, output_dir, email_from, email_password, - subject, smtp_server, smtp_port, use_ssl, test_mode, delay=2, zip_filename=None, test_email=None): + subject, smtp_server, smtp_port, use_ssl, test_mode, delay=2, zip_filename=None, test_email=None, force_attachments=None): """发送邮件""" try: # 确保输出目录存在 @@ -413,36 +452,63 @@ def send_emails(distribution_csv, output_dir, email_from, email_password, if test_email: logger.info(f"测试邮箱模式已启用,所有邮件将发送到: {test_email}") - # 按邮箱分组 - email_groups = {} - for _, row in df.iterrows(): - email = row['email'] - if email not in email_groups: - email_groups[email] = [] - email_groups[email].append(row) + # 处理强制性附件 + forced_attachments = [] + if force_attachments: + for attachment_path in force_attachments.split(','): + attachment_path = attachment_path.strip() + if os.path.exists(attachment_path): + if os.path.isdir(attachment_path): + # 如果是目录,添加目录下所有文件 + for root, _, files in os.walk(attachment_path): + for file in files: + file_path = os.path.join(root, file) + forced_attachments.append(file_path) + logger.info(f"添加强制性附件目录文件: {file_path}") + else: + # 如果是文件,直接添加 + forced_attachments.append(attachment_path) + logger.info(f"添加强制性附件: {attachment_path}") + else: + logger.warning(f"强制性附件不存在,跳过: {attachment_path}") + + logger.info(f"共添加 {len(forced_attachments)} 个强制性附件") - logger.info(f"共有 {len(email_groups)} 个邮箱需要发送") + # 按用户ID分组而不是按邮箱分组,确保即使邮箱相同也会发送多封邮件 + user_groups = {} + for _, row in df.iterrows(): + user_id = row['user_id'] + if user_id not in user_groups: + user_groups[user_id] = [] + user_groups[user_id].append(row) + + logger.info(f"共有 {len(user_groups)} 个用户需要发送") # 结果记录 results = { - "total": len(email_groups), + "total": len(user_groups), "success": 0, "failed": 0, "details": [] } # 发送邮件 - for email, rows in email_groups.items(): + for user_id, rows in user_groups.items(): try: - logger.info(f"处理邮箱: {email}") + # 获取用户邮箱 + email = rows[0]['email'] + logger.info(f"处理用户: {user_id}, 邮箱: {email}") # 收集所有文件路径 files = [] article_contents = [] + valid_article_count = 0 for row in rows: # 收集文本文件并读取内容 file_path = row.get('file_path') + has_article_content = False + if file_path and not pd.isna(file_path) and os.path.exists(file_path): files.append(file_path) @@ -450,9 +516,10 @@ def send_emails(distribution_csv, output_dir, email_from, email_password, try: with open(file_path, 'r', encoding='utf-8') as f: article_content = f.read() - # product_name = row.get('product', '') - # object_name = row.get('object', '') - article_contents.append(f"\n\n{article_content}\n\n{'='*50}\n\n") + if article_content.strip(): # 确保内容不为空 + article_contents.append(f"\n\n{article_content}\n\n{'='*50}\n\n") + has_article_content = True + valid_article_count += 1 except Exception as e: logger.warning(f"读取文章内容失败: {file_path}, 错误: {e}") @@ -468,13 +535,77 @@ def send_emails(distribution_csv, output_dir, email_from, email_password, if img_path.strip() and os.path.exists(img_path): files.append(img_path) - if not files: - logger.warning(f"邮箱 {email} 没有有效的附件文件,跳过") + # 添加强制性附件 + if forced_attachments: + files.extend(forced_attachments) + + # 检查是否有有效的文件和文章内容 + if not files or valid_article_count == 0: + logger.warning(f"用户 {user_id} (邮箱: {email}) 没有有效的文件或文章内容,尝试查找替代内容") + + # 尝试查找替代内容 + try: + # 连接数据库 + db_path = '/root/autodl-tmp/TravelContentCreator/distribution.db' + conn = sqlite3.connect(db_path) + conn.row_factory = sqlite3.Row + + # 查询一篇审核通过但未分发的内容 + cursor = conn.cursor() + cursor.execute(""" + SELECT * FROM contents + WHERE judge_status = 1 AND is_distributed = 0 + LIMIT 1 + """) + + alt_content = cursor.fetchone() + + if alt_content: + logger.info(f"找到替代内容: {alt_content['entry_id']}") + + # 清空之前的内容 + files = [] + article_contents = [] + + # 添加替代内容 + alt_file_path = alt_content['output_txt_path'] + if alt_file_path and os.path.exists(alt_file_path): + files.append(alt_file_path) + # 读取文章内容 + try: + with open(alt_file_path, 'r', encoding='utf-8') as f: + alt_article_content = f.read() + article_contents.append(f"\n\n{alt_article_content}\n\n{'='*50}\n\n") + valid_article_count = 1 + except Exception as e: + logger.warning(f"读取替代内容失败: {alt_file_path}, 错误: {e}") + + # 添加海报和额外图片 + alt_poster_path = alt_content['poster_path'] + if alt_poster_path and os.path.exists(alt_poster_path): + files.append(alt_poster_path) + + # 寻找额外图片 + alt_additional_images = find_additional_images(alt_file_path) + for img_path in alt_additional_images: + if os.path.exists(img_path): + files.append(img_path) + else: + logger.warning("未找到替代内容") + + conn.close() + + except Exception as e: + logger.error(f"查找替代内容时出错: {e}") + + # 再次检查是否有有效的文件和内容 + if not files or valid_article_count == 0: + logger.warning(f"用户 {user_id} (邮箱: {email}) 没有有效的附件文件或文章内容,跳过") results["details"].append({ - "user_id": rows[0]['user_id'], + "user_id": user_id, "email": email, "status": "skipped", - "reason": "没有有效的附件文件", + "reason": "没有有效的附件文件或文章内容", "files": [] }) results["failed"] += 1 @@ -482,14 +613,14 @@ def send_emails(distribution_csv, output_dir, email_from, email_password, # 构建邮件内容 files_count = len(files) - entries_count = len(rows) + entries_count = valid_article_count # 基本邮件内容 base_email_content = f""" 您好!请查收今日带货笔记(文案+配图),内容在邮件正文和附件中。具体挂载商品等操作流程请查看对应达人微信群内信息。 优化建议: -①所有提供的材料仅为参考,支持自行修改 +①所有提供的材料仅为参考,支持自行修改(您可以参考我们整理给您的<小红书爆款标题公式文件>针对标题进行修改仿写) ②标题优化和格式的规整更有利于带来小眼睛 ③本测试提供的商品均为近期高佣金筛选后的优质商单,质量保证 @@ -502,9 +633,9 @@ def send_emails(distribution_csv, output_dir, email_from, email_password, # 发送邮件 if test_mode: - logger.info(f"测试模式: 模拟发送邮件到 {email}") + logger.info(f"测试模式: 模拟发送邮件到用户 {user_id} (邮箱: {email})") results["details"].append({ - "user_id": rows[0]['user_id'], + "user_id": user_id, "email": email, "status": "success (test mode)", "files": files @@ -532,7 +663,7 @@ def send_emails(distribution_csv, output_dir, email_from, email_password, if success: results["success"] += 1 results["details"].append({ - "user_id": rows[0]['user_id'], + "user_id": user_id, "email": email, "status": "success", "files": files @@ -540,7 +671,7 @@ def send_emails(distribution_csv, output_dir, email_from, email_password, else: results["failed"] += 1 results["details"].append({ - "user_id": rows[0]['user_id'], + "user_id": user_id, "email": email, "status": "failed", "reason": "SMTP发送失败", @@ -553,9 +684,10 @@ def send_emails(distribution_csv, output_dir, email_from, email_password, time.sleep(delay) except Exception as e: - logger.error(f"处理邮箱 {email} 时出错: {e}") + logger.error(f"处理用户 {user_id} (邮箱: {email}) 时出错: {e}") results["failed"] += 1 results["details"].append({ + "user_id": user_id, "email": email, "status": "failed", "reason": str(e), @@ -564,9 +696,9 @@ def send_emails(distribution_csv, output_dir, email_from, email_password, # 打印发送结果摘要 logger.info("\n===== 邮件发送结果摘要 =====") - logger.info(f"总计: {results['total']} 个邮箱") - logger.info(f"成功: {results['success']} 个邮箱") - logger.info(f"失败: {results['failed']} 个邮箱") + logger.info(f"总计: {results['total']} 个用户") + logger.info(f"成功: {results['success']} 个用户") + logger.info(f"失败: {results['failed']} 个用户") # 将结果写入CSV文件 try: @@ -651,7 +783,7 @@ def record_distribution_to_database(conn, distribution_csv, send_result_csv): conn.rollback() return False -def generate_distribution_report(conn, distribution_csv, send_result_csv, output_dir): +def generate_distribution_report(conn, distribution_csv, send_result_csv, output_dir, insufficient_content_users=None): """生成分发报告""" try: # 读取分发CSV @@ -744,6 +876,48 @@ def generate_distribution_report(conn, distribution_csv, send_result_csv, output with open(stats_json, 'w') as f: json.dump(article_stats, f, indent=2) + # 如果有内容不足的用户,重新导出一份详细报告 + if insufficient_content_users: + detailed_insufficient_users = [] + + # 获取收件用户的实际发送情况 + user_send_stats = {} + for _, row in dist_df.iterrows(): + user_id = row['user_id'] + if user_id not in user_send_stats: + user_send_stats[user_id] = { + 'email': row['email'], + 'username': row['username'], + 'sent_count': 0, + 'success_count': 0 + } + + user_send_stats[user_id]['sent_count'] += 1 + if 'success' in str(row.get('send_status', '')).lower(): + user_send_stats[user_id]['success_count'] += 1 + + # 合并内容不足用户的实际发送情况 + for user in insufficient_content_users: + user_id = user['user_id'] + send_info = user_send_stats.get(user_id, {'sent_count': 0, 'success_count': 0}) + + detailed_insufficient_users.append({ + 'user_id': user['user_id'], + 'email': user['email'], + 'username': user['username'], + 'expected_content': user['expected_content'], + 'allocated_content': user['actual_content'], + 'sent_count': send_info['sent_count'], + 'success_count': send_info['success_count'] + }) + + # 导出详细的内容不足用户报告 + if detailed_insufficient_users: + detailed_df = pd.DataFrame(detailed_insufficient_users) + detailed_csv = os.path.join(output_dir, f"insufficient_content_users_detailed_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv") + detailed_df.to_csv(detailed_csv, index=False) + logger.info(f"内容不足用户的详细报告已保存到: {detailed_csv}") + # 统计摘要 cursor.execute("SELECT COUNT(*) FROM contents") total_articles = cursor.fetchone()[0] @@ -759,6 +933,10 @@ def generate_distribution_report(conn, distribution_csv, send_result_csv, output logger.info(f"已分配文章数: {distributed_articles} ({distributed_articles/total_articles*100:.2f}%)") logger.info(f"本次成功发送文章数: {success_sent}") + # 内容不足的用户统计 + if insufficient_content_users: + logger.info(f"内容不足的用户数: {len(insufficient_content_users)}") + # 按产品统计 cursor.execute(""" SELECT product, @@ -822,7 +1000,7 @@ def main(): return # 为用户分配内容 - users_with_content = allocate_content_to_users( + users_with_content, insufficient_content_users = allocate_content_to_users( users_df, content_df, args.article_per_user ) @@ -831,7 +1009,7 @@ def main(): return # 准备分发CSV - distribution_csv = prepare_distribution_csv(users_with_content, args.output_dir) + distribution_csv = prepare_distribution_csv(users_with_content, args.output_dir, insufficient_content_users) if not distribution_csv: logger.error("准备分发CSV失败,程序退出") return @@ -840,7 +1018,7 @@ def main(): send_result_csv = send_emails( distribution_csv, args.output_dir, args.email_from, args.email_password, args.subject, args.smtp_server, args.smtp_port, args.use_ssl, args.test_mode, - args.delay, args.zip_filename, args.test_email + args.delay, args.zip_filename, args.test_email, args.force_attachments ) # 记录分发结果到数据库 @@ -853,7 +1031,7 @@ def main(): logger.info("测试邮箱模式,不记录分发结果到数据库") # 生成分发报告 - generate_distribution_report(conn, distribution_csv, send_result_csv, args.output_dir) + generate_distribution_report(conn, distribution_csv, send_result_csv, args.output_dir, insufficient_content_users) logger.info("内容分发流程完成") diff --git a/scripts/distribution/distribute_example.sh b/scripts/distribution/distribute_example.sh index abd0dcb..0b47d7a 100755 --- a/scripts/distribution/distribute_example.sh +++ b/scripts/distribution/distribute_example.sh @@ -14,8 +14,10 @@ DB_PATH="$BASE_DIR/distribution.db" # 设置邮件相关变量 EMAIL_FROM="zwysendemail@163.com" EMAIL_PASSWORD="NMhVGFmCJkGEy3B5" -SUBJECT="文旅小红书带货笔记内容0515" -ZIP_FILENAME="文旅小红书带货笔记内容0515" +# EMAIL_FROM="zowoyomedia@163.com" +# EMAIL_PASSWORD="SDj5fK6Tk9YevmsD" +SUBJECT="文旅小红书带货笔记内容0516" +ZIP_FILENAME="文旅小红书带货笔记内容0516" # 设置分发配置 ARTICLE_PER_USER=1 @@ -31,12 +33,15 @@ UNDISTRIBUTED_ONLY=true # 只分发未分发的内容 # 内容筛选配置 TARGET_PRODUCT="" # 为空则不筛选特定产品 -TARGET_OBJECT="极爽冲浪馆" # 为空则不筛选特定景点 +TARGET_OBJECT="深圳青青世界酒店" # 为空则不筛选特定景点 # 用户筛选配置 TARGET_USER_ID="" # 为空则不筛选特定用户ID TARGET_USER_EMAIL="" # 为空则不筛选特定用户邮箱 +# 强制性附件配置 +FORCE_ATTACHMENTS="/root/autodl-tmp/TravelContentCreator/hotel_img/标题参考格式-精选.txt" + # 创建必要的目录 mkdir -p "$LOG_DIR" mkdir -p "$OUTPUT_DIR" @@ -85,6 +90,20 @@ if [ "$TEST_EMAIL_MODE" = true ]; then fi fi +# 检查强制性附件配置 +if [ -n "$FORCE_ATTACHMENTS" ]; then + echo "已配置强制性附件: $FORCE_ATTACHMENTS" + # 验证文件是否存在 + IFS=',' read -ra ATTACHMENT_ARRAY <<< "$FORCE_ATTACHMENTS" + for attachment in "${ATTACHMENT_ARRAY[@]}"; do + if [ ! -e "$attachment" ]; then + echo "警告: 强制性附件文件或目录不存在: $attachment" + else + echo "已确认强制性附件存在: $attachment" + fi + done +fi + # 数据库状态统计 echo "数据库状态统计:" sqlite3 "$DB_PATH" < 0: + confirm_delete = input(f"警告: 数据库中有 {distribution_count} 条分发记录与用户关联,确定要删除所有用户吗? (y/n): ").strip().lower() + if confirm_delete != 'y': + print("取消清除用户数据") + return False + + # 清除所有用户数据 + cursor.execute("DELETE FROM users") + conn.commit() + print(f"已清除所有历史用户数据") + except sqlite3.Error as e: + print(f"清除用户数据时出错: {e}") + conn.rollback() + return False + + # 检查数据库中已存在的小红书User码 + cursor.execute("SELECT xhs_user_id FROM users WHERE xhs_user_id IS NOT NULL") + existing_user_ids = set(row[0] for row in cursor.fetchall() if row[0]) + print(f"数据库中已有 {len(existing_user_ids)} 个带有小红书User码的用户") + + except sqlite3.Error as e: + print(f"数据库连接错误: {e}") + return False + + # 计数器 + success_count = 0 + skip_count = 0 + error_count = 0 + update_count = 0 + # 记录失败和跳过的条目 + failed_entries = [] + skipped_entries = [] + + # 读取CSV文件并导入数据 + try: + with open(csv_file, 'r', encoding='utf-8') as f: + reader = csv.reader(f) + # 跳过标题行 + headers = next(reader) + print(f"CSV文件标题: {headers}") + + for row_num, row in enumerate(reader, 2): # 从2开始计数,因为第1行是标题 + if len(row) >= 4: # 确保行至少有4列 + try: + # 从CSV提取数据 (小红书User码, 小红书ID, 粉丝数, 邮箱) + xhs_user_id, username, fans, email = row + + # 如果邮箱为空,跳过 + if not email: + skip_info = f"邮箱为空" + skipped_entries.append((row_num, row, skip_info)) + print(f"警告: 跳过无邮箱用户: {username} (行 {row_num})") + skip_count += 1 + continue + + # 确保邮箱格式正确 + if '@' not in email: + error_info = f"无效邮箱格式" + failed_entries.append((row_num, row, error_info)) + print(f"警告: 跳过无效邮箱: {email} (行 {row_num})") + error_count += 1 + continue + + # 检查小红书User码是否有效 + if not xhs_user_id: + error_info = f"小红书User码为空" + failed_entries.append((row_num, row, error_info)) + print(f"警告: 小红书User码为空: {email} (行 {row_num})") + error_count += 1 + continue + + # 检查小红书User码是否已存在,如果存在则更新信息 + if xhs_user_id in existing_user_ids: + print(f"用户User码已存在,将更新: {xhs_user_id} -> {username}, {email}") + cursor.execute(""" + UPDATE users + SET username = ?, email = ? + WHERE xhs_user_id = ? + """, (username, email, xhs_user_id)) + update_count += 1 + skipped_entries.append((row_num, row, "用户User码已存在,已更新信息")) + else: + # 插入新用户 + cursor.execute(""" + INSERT INTO users (email, username, xhs_user_id) + VALUES (?, ?, ?) + """, (email, username, xhs_user_id)) + existing_user_ids.add(xhs_user_id) # 添加到已存在列表 + success_count += 1 + print(f"插入新用户: {xhs_user_id} -> {username}, {email}") + + except sqlite3.IntegrityError as e: + # 处理唯一约束冲突 + if "UNIQUE constraint failed: users.xhs_user_id" in str(e): + error_info = f"小红书User码重复" + failed_entries.append((row_num, row, error_info)) + print(f"错误: 小红书User码重复: {xhs_user_id} (行 {row_num})") + else: + error_info = f"数据库完整性错误: {e}" + failed_entries.append((row_num, row, error_info)) + print(f"插入行时出错 (行 {row_num}): {row}, 错误: {e}") + error_count += 1 + except Exception as e: + error_info = f"数据库错误: {e}" + failed_entries.append((row_num, row, error_info)) + print(f"插入行时出错 (行 {row_num}): {row}, 错误: {e}") + error_count += 1 + else: + error_info = f"列数不足,需要至少4列" + failed_entries.append((row_num, row, error_info)) + print(f"警告: 跳过格式不正确的行 {row_num}: {row}") + error_count += 1 + + # 提交事务 + conn.commit() + print(f"\n导入完成! 成功插入: {success_count}, 更新: {update_count}, 跳过: {skip_count}, 失败: {error_count}") + + # 如果有失败的条目,输出详细信息 + if failed_entries: + print("\n===== 导入失败的记录 =====") + for row_num, row, error in failed_entries: + print(f"行 {row_num}: {row} - 失败原因: {error}") + + # 将失败记录保存到文件 + failure_file = os.path.join(os.path.dirname(csv_file), "import_failures_5.15.csv") + with open(failure_file, 'w', newline='', encoding='utf-8') as f: + writer = csv.writer(f) + writer.writerow(['行号', '原始数据', '失败原因']) + for row_num, row, error in failed_entries: + writer.writerow([row_num, ','.join(row), error]) + + print(f"\n失败记录已保存到: {failure_file}") + + # 如果有跳过的条目,输出详细信息 + if skipped_entries: + print("\n===== 跳过的记录 =====") + for row_num, row, reason in skipped_entries: + print(f"行 {row_num}: {row} - 跳过原因: {reason}") + + except Exception as e: + print(f"读取CSV文件时出错: {e}") + conn.rollback() + return False + finally: + conn.close() + + return True + +if __name__ == "__main__": + if main(): + print("用户数据导入成功!") + else: + print("用户数据导入失败!") + sys.exit(1) \ No newline at end of file