From 1a8ddef1ff0189e2b96f77366d6ea856777fb451 Mon Sep 17 00:00:00 2001 From: jinye_huang Date: Tue, 13 May 2025 19:01:27 +0800 Subject: [PATCH] =?UTF-8?q?=E8=BF=BD=E5=8A=A0=E6=95=B0=E6=8D=AE=E5=BA=93?= =?UTF-8?q?=E6=A8=A1=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scripts/distribution/distribute_content.py | 675 +++++++++++------- scripts/distribution/distribute_content_db.py | 0 scripts/distribution/distribute_example.sh | 232 ++++-- scripts/distribution/extract_and_render.py | 196 ++++- 4 files changed, 776 insertions(+), 327 deletions(-) create mode 100644 scripts/distribution/distribute_content_db.py diff --git a/scripts/distribution/distribute_content.py b/scripts/distribution/distribute_content.py index fac3566..7de064d 100644 --- a/scripts/distribution/distribute_content.py +++ b/scripts/distribution/distribute_content.py @@ -7,7 +7,7 @@ import pandas as pd import argparse import random import logging -import subprocess +import sqlite3 from datetime import datetime import json import smtplib @@ -24,7 +24,7 @@ logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ - logging.FileHandler(f"content_distribution_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"), + logging.FileHandler(f"content_distribution_db_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"), logging.StreamHandler() ] ) @@ -32,13 +32,9 @@ logger = logging.getLogger(__name__) def parse_arguments(): """解析命令行参数""" - parser = argparse.ArgumentParser(description='内容分发') + parser = argparse.ArgumentParser(description='内容分发 (数据库版)') # 必需参数 - parser.add_argument('--user-csv', type=str, required=True, - help='小红书用户CSV文件路径') - parser.add_argument('--manifest-csv', type=str, required=True, - help='内容清单CSV文件路径') parser.add_argument('--output-dir', type=str, default='distribution_results', help='分发结果输出目录') parser.add_argument('--email-from', type=str, required=True, @@ -46,11 +42,29 @@ def parse_arguments(): parser.add_argument('--email-password', type=str, required=True, help='发件人邮箱授权码') + # 数据库相关参数 + parser.add_argument('--db-path', type=str, default='/root/autodl-tmp/TravelContentCreator/distribution.db', + help='数据库文件路径') + + # 内容筛选参数 + parser.add_argument('--product', type=str, default=None, + help='指定产品名称进行筛选') + parser.add_argument('--object', type=str, default=None, + help='指定景点名称进行筛选') + parser.add_argument('--undistributed-only', action='store_true', + help='只选择未分发的内容') + + # 用户筛选参数 + parser.add_argument('--user-id', type=int, default=None, + help='指定用户ID进行分发') + parser.add_argument('--user-email', type=str, default=None, + help='指定用户邮箱进行分发') + parser.add_argument('--max-users', type=int, default=None, + help='最大用户数量') + # 可选参数 parser.add_argument('--article-per-user', type=int, default=3, help='每个用户分配的文章数量') - parser.add_argument('--max-send-count', type=int, default=None, - help='最大发送数量限制') parser.add_argument('--subject', type=str, default='您的旅游内容创作已就绪', help='邮件主题') parser.add_argument('--smtp-server', type=str, default='smtp.163.com', @@ -59,25 +73,200 @@ def parse_arguments(): help='SMTP服务器端口') parser.add_argument('--use-ssl', action='store_true', help='使用SSL连接SMTP服务器') - parser.add_argument('--email-column', type=str, default='达人邮箱', - help='用户CSV中邮箱列的名称') - parser.add_argument('--username-column', type=str, default='小红书ID', - help='用户CSV中用户名列的名称') parser.add_argument('--judge-only-success', action='store_true', help='只分发审核成功的内容') parser.add_argument('--test-mode', action='store_true', help='测试模式,不实际发送邮件') parser.add_argument('--delay', type=int, default=2, help='每封邮件发送之间的延迟时间(秒)') - parser.add_argument('--previous-distribution', type=str, default=None, - help='上一次分发结果CSV或报告文件路径,用于避免重复发送') - parser.add_argument('--skip-sent-success', action='store_true', - help='跳过上次成功发送的文章') parser.add_argument('--zip-filename', type=str, default=None, help='指定ZIP压缩包的基本文件名(不含扩展名),如"文旅小红书带货笔记内容0512"') return parser.parse_args() +def create_database_connection(db_path): + """创建到SQLite数据库的连接""" + try: + conn = sqlite3.connect(db_path) + conn.row_factory = sqlite3.Row # 使结果以字典形式访问 + return conn + except sqlite3.Error as e: + logger.error(f"连接数据库失败: {e}") + return None + +def query_contents_from_database(conn, product=None, object=None, judge_only_success=False, undistributed_only=False): + """从数据库查询内容""" + try: + cursor = conn.cursor() + + # 构建查询条件 + conditions = [] + params = [] + + if product: + conditions.append("product LIKE ?") + params.append(f"%{product}%") + + if object: + conditions.append("object LIKE ?") + params.append(f"%{object}%") + + if judge_only_success: + conditions.append("judge_status = 1") + + if undistributed_only: + conditions.append("is_distributed = 0") + + # 构建SQL查询 + sql = "SELECT * FROM contents" + if conditions: + sql += " WHERE " + " AND ".join(conditions) + + # 执行查询 + cursor.execute(sql, params) + results = cursor.fetchall() + + # 转换为DataFrame + content_list = [] + for row in results: + content_list.append({ + 'id': row['id'], # 内部ID,用于数据库关联 + 'EntryID': row['entry_id'], + 'OutputTxtPath': row['output_txt_path'], + 'PosterPath': row['poster_path'], + 'ArticleJsonPath': row['article_json_path'], + 'Product': row['product'], + 'Object': row['object'], + 'Date': row['date'], + 'Logic': row['logic'], + 'JudgeStatus': row['judge_status'], + 'IsDistributed': row['is_distributed'] + }) + + content_df = pd.DataFrame(content_list) + + # 输出查询统计信息 + logger.info(f"从数据库查询到 {len(content_df)} 条内容") + if product: + logger.info(f"按产品筛选: {product}") + if object: + logger.info(f"按景点筛选: {object}") + if judge_only_success: + logger.info("仅显示审核通过的内容") + if undistributed_only: + logger.info("仅显示未分发的内容") + + return content_df + + except Exception as e: + logger.error(f"从数据库查询内容失败: {e}") + return None + +def query_users_from_database(conn, user_id=None, user_email=None, max_users=None): + """从数据库查询用户""" + try: + cursor = conn.cursor() + + # 构建查询条件 + conditions = [] + params = [] + + if user_id: + conditions.append("id = ?") + params.append(user_id) + + if user_email: + conditions.append("email LIKE ?") + params.append(f"%{user_email}%") + + # 构建SQL查询 + sql = "SELECT id, email, username FROM users" + if conditions: + sql += " WHERE " + " AND ".join(conditions) + + # 限制用户数量 + if max_users: + sql += f" LIMIT {max_users}" + + # 执行查询 + cursor.execute(sql, params) + results = cursor.fetchall() + + # 转换为DataFrame + user_list = [] + for row in results: + user_list.append({ + 'id': row['id'], + 'email': row['email'], + 'username': row['username'] or row['email'].split('@')[0] + }) + + user_df = pd.DataFrame(user_list) + + # 输出查询统计信息 + logger.info(f"从数据库查询到 {len(user_df)} 个用户") + if user_id: + logger.info(f"按用户ID筛选: {user_id}") + if user_email: + logger.info(f"按用户邮箱筛选: {user_email}") + if max_users: + logger.info(f"限制最大用户数: {max_users}") + + return user_df + + except Exception as e: + logger.error(f"从数据库查询用户失败: {e}") + return None + +def allocate_content_to_users(users_df, content_df, article_per_user): + """为用户分配内容""" + try: + # 创建用户列表 + users = [] + for _, row in users_df.iterrows(): + users.append({ + 'id': row['id'], + 'email': row['email'], + 'username': row['username'], + 'contents': [] + }) + + # 转换为记录列表 + content_list = content_df.to_dict('records') + if not content_list: + logger.warning("没有可用内容进行分配") + return [] + + # 随机打乱内容列表 + random.shuffle(content_list) + + # 为每个用户分配内容 + content_allocated = [] + content_index = 0 + + for user in users: + user_contents = [] + for _ in range(article_per_user): + if content_index >= len(content_list): + content_index = 0 # 如果内容不够,循环使用 + logger.warning("内容不足,将循环使用现有内容") + + content = content_list[content_index] + user_contents.append(content) + content_allocated.append(content) + content_index += 1 + + user['contents'] = user_contents + + logger.info(f"已为 {len(users)} 个用户分配 {len(content_allocated)} 条内容") + unique_content_count = len(set([c['EntryID'] for c in content_allocated])) + logger.info(f"分配的唯一内容条数: {unique_content_count}") + + return users + except Exception as e: + logger.error(f"分配内容失败: {e}") + return [] + def find_additional_images(file_path): """查找与文章相关的所有额外图片""" if not file_path or pd.isna(file_path) or not os.path.exists(file_path): @@ -105,161 +294,6 @@ def find_additional_images(file_path): logger.info(f"找到 {len(image_files)} 张额外图片: {additional_images_dir}") return image_files -def read_user_csv(user_csv_path, email_column, username_column): - """读取用户CSV文件""" - try: - df = pd.read_csv(user_csv_path) - - # 检查必要的列是否存在 - if email_column not in df.columns: - logger.error(f"用户CSV中缺少邮箱列 '{email_column}'") - return None - - # 过滤有效邮箱(非空且包含@符号) - df = df[df[email_column].notna()] - df = df[df[email_column].astype(str).str.contains('@')] - - # 获取用户名列,如果不存在则创建默认用户名 - if username_column not in df.columns: - logger.warning(f"用户CSV中缺少用户名列 '{username_column}',使用邮箱前缀作为用户名") - df[username_column] = df[email_column].apply(lambda x: x.split('@')[0]) - - logger.info(f"成功读取 {len(df)} 个有效用户") - return df - except Exception as e: - logger.error(f"读取用户CSV失败: {e}") - return None - -def read_manifest_csv(manifest_csv_path, judge_only_success=False): - """读取内容清单CSV文件""" - try: - df = pd.read_csv(manifest_csv_path) - - # 检查必要的列是否存在 - required_columns = ['OutputTxtPath', 'PosterPath', 'ArticleJsonPath'] - missing_columns = [col for col in required_columns if col not in df.columns] - if missing_columns: - logger.error(f"内容清单CSV中缺少必要列: {', '.join(missing_columns)}") - return None - - # 过滤审核成功的内容 - if judge_only_success and 'JudgeStatus' in df.columns: - original_count = len(df) - df = df[df['JudgeStatus'] == True] - logger.info(f"只保留审核成功的内容,从 {original_count} 条过滤为 {len(df)} 条") - - logger.info(f"成功读取 {len(df)} 条内容") - return df - except Exception as e: - logger.error(f"读取内容清单CSV失败: {e}") - return None - -def read_previous_distribution(previous_file, skip_sent_success=True): - """读取上一次的分发结果,获取已发放过的文章ID""" - if not previous_file or not os.path.exists(previous_file): - logger.info("未提供上一次分发结果文件或文件不存在,将不过滤已发放文章") - return set() - - try: - df = pd.read_csv(previous_file) - - # 确定文件类型并提取相关列 - if 'entry_id' in df.columns: # distribution.csv 或 email_results.csv - if 'send_status' in df.columns and skip_sent_success: - # 只过滤成功发送的 - already_sent = df[df['send_status'].str.contains('success', na=False)]['entry_id'].unique() - else: - # 过滤所有分配过的 - already_sent = df['entry_id'].unique() - elif 'EntryID' in df.columns: # manifest_with_dist.csv - if 'sent_success' in df.columns and skip_sent_success: - # 只过滤成功发送的 - already_sent = df[df['sent_success'] > 0]['EntryID'].unique() - else: - # 过滤所有分配过的 - already_sent = df[df['assigned_count'] > 0]['EntryID'].unique() - elif 'entry_id' in df.columns: # distribution_summary.csv - if 'success' in df.columns and skip_sent_success: - # 只过滤成功发送的 - already_sent = df[df['success'] > 0]['entry_id'].unique() - else: - # 过滤所有分配过的 - already_sent = df['entry_id'].unique() - else: - logger.warning(f"无法识别的分发结果文件格式: {previous_file}") - return set() - - already_sent_set = set(already_sent) - logger.info(f"从上一次分发结果中找到 {len(already_sent_set)} 篇已发放文章") - return already_sent_set - - except Exception as e: - logger.error(f"读取上一次分发结果失败: {e}") - return set() - -def allocate_content_to_users(users_df, content_df, article_per_user, - email_column, username_column, max_send_count=None, - already_sent_ids=None): - """为用户分配内容""" - try: - # 创建用户列表 - users = [] - for _, row in users_df.iterrows(): - email = row[email_column] - username = row[username_column] if not pd.isna(row[username_column]) else f"用户_{_}" - users.append({ - 'email': email, - 'username': username, - 'contents': [] - }) - - # 转换为记录列表并过滤已发放的文章 - content_list = content_df.to_dict('records') - if already_sent_ids: - original_count = len(content_list) - content_list = [c for c in content_list if c['EntryID'] not in already_sent_ids] - filtered_count = original_count - len(content_list) - logger.info(f"过滤掉 {filtered_count} 篇已发放文章,剩余 {len(content_list)} 篇可用") - - if not content_list: - logger.warning("过滤后没有可用文章,请提供新内容或关闭过滤功能") - return [] - - # 随机打乱内容列表 - random.shuffle(content_list) - - # 为每个用户分配内容 - content_allocated = [] - content_index = 0 - - # 限制最大发送数量 - if max_send_count is not None and max_send_count > 0: - users = users[:max_send_count] - logger.info(f"限制发送用户数量为 {max_send_count}") - - for user in users: - user_contents = [] - for _ in range(article_per_user): - if content_index >= len(content_list): - content_index = 0 # 如果内容不够,循环使用 - logger.warning("内容不足,将循环使用现有内容") - - content = content_list[content_index] - user_contents.append(content) - content_allocated.append(content) - content_index += 1 - - user['contents'] = user_contents - - logger.info(f"已为 {len(users)} 个用户分配 {len(content_allocated)} 条内容") - unique_content_count = len(set([c['EntryID'] for c in content_allocated])) - logger.info(f"分配的唯一内容条数: {unique_content_count}") - - return users - except Exception as e: - logger.error(f"分配内容失败: {e}") - return [] - def prepare_distribution_csv(users_with_content, output_dir): """准备分发CSV文件""" try: @@ -274,15 +308,15 @@ def prepare_distribution_csv(users_with_content, output_dir): additional_images = find_additional_images(output_txt_path) rows.append({ + 'user_id': user['id'], 'email': user['email'], 'username': user['username'], + 'content_id': content['id'], + 'entry_id': content['EntryID'], 'file_path': content['OutputTxtPath'], 'poster_path': content['PosterPath'], 'article_json_path': content['ArticleJsonPath'], 'additional_images': ';'.join(additional_images), # 保存为分号分隔的字符串 - 'entry_id': content['EntryID'], - 'topic_index': content.get('TopicIndex', ''), - 'variant_index': content.get('VariantIndex', ''), 'product': content.get('Product', ''), 'object': content.get('Object', ''), 'date': content.get('Date', ''), @@ -423,6 +457,7 @@ def send_emails(distribution_csv, output_dir, email_from, email_password, if not files: logger.warning(f"邮箱 {email} 没有有效的附件文件,跳过") results["details"].append({ + "user_id": rows[0]['user_id'], "email": email, "status": "skipped", "reason": "没有有效的附件文件", @@ -447,6 +482,7 @@ def send_emails(distribution_csv, output_dir, email_from, email_password, else: logger.error(f"为邮箱 {email} 创建ZIP文件失败,跳过") results["details"].append({ + "user_id": rows[0]['user_id'], "email": email, "status": "failed", "reason": "创建ZIP文件失败", @@ -461,12 +497,14 @@ def send_emails(distribution_csv, output_dir, email_from, email_password, email_content = f"""您好!请查收今日带货笔记(文案+配图),内容在文件压缩包内。具体挂载商品等操作流程请查看对应达人微信群内信息。 +共包含{entries_count}篇文章内容,请按照要求发布。 """ # 发送邮件 if test_mode: logger.info(f"测试模式: 模拟发送邮件到 {email}") results["details"].append({ + "user_id": rows[0]['user_id'], "email": email, "status": "success (test mode)", "files": files @@ -488,6 +526,7 @@ def send_emails(distribution_csv, output_dir, email_from, email_password, if success: results["success"] += 1 results["details"].append({ + "user_id": rows[0]['user_id'], "email": email, "status": "success", "files": files @@ -495,6 +534,7 @@ def send_emails(distribution_csv, output_dir, email_from, email_password, else: results["failed"] += 1 results["details"].append({ + "user_id": rows[0]['user_id'], "email": email, "status": "failed", "reason": "SMTP发送失败", @@ -549,7 +589,74 @@ def send_emails(distribution_csv, output_dir, email_from, email_password, logger.error(f"发送邮件过程中发生错误: {e}") return None -def generate_distribution_report(distribution_csv, send_result_csv, manifest_csv, output_dir): +def record_distribution_to_database(conn, distribution_csv, send_result_csv): + """记录分发结果到数据库""" + try: + # 读取分发CSV + dist_df = pd.read_csv(distribution_csv) + + # 读取发送结果CSV + if send_result_csv and os.path.exists(send_result_csv): + result_df = pd.read_csv(send_result_csv) + # 添加发送状态 + email_status = {} + for _, row in result_df.iterrows(): + email_status[row['email']] = row['status'] + + dist_df['send_status'] = dist_df['email'].map(email_status) + else: + dist_df['send_status'] = 'unknown' + + # 记录到distributions表 + cursor = conn.cursor() + batch_id = f"batch_{datetime.now().strftime('%Y%m%d_%H%M%S')}" + + # 添加记录 + insertion_count = 0 + for _, row in dist_df.iterrows(): + try: + user_id = row['user_id'] + content_id = row['content_id'] + + # 检查是否成功发送 + success = 'success' in str(row.get('send_status', '')).lower() + + # 插入分发记录 + cursor.execute(""" + INSERT INTO distributions + (content_id, user_id, distribution_date, send_status, batch_id) + VALUES (?, ?, ?, ?, ?) + """, ( + content_id, + user_id, + datetime.now().strftime('%Y-%m-%d %H:%M:%S'), + row.get('send_status', 'unknown'), + batch_id + )) + + insertion_count += 1 + + # 如果成功发送,更新内容分发状态 + if success: + cursor.execute(""" + UPDATE contents + SET is_distributed = 1 + WHERE id = ? + """, (content_id,)) + + except Exception as e: + logger.warning(f"记录分发结果失败: {row.get('entry_id')} -> {row.get('email')}, 错误: {e}") + + conn.commit() + logger.info(f"已记录 {insertion_count} 条分发记录到数据库,批次ID: {batch_id}") + + return True + except Exception as e: + logger.error(f"记录分发结果到数据库失败: {e}") + conn.rollback() + return False + +def generate_distribution_report(conn, distribution_csv, send_result_csv, output_dir): """生成分发报告""" try: # 读取分发CSV @@ -573,6 +680,7 @@ def generate_distribution_report(distribution_csv, send_result_csv, manifest_csv entry_id = row['entry_id'] if entry_id not in article_stats: article_stats[entry_id] = { + 'content_id': row['content_id'], 'total_assigned': 0, 'sent_success': 0, 'sent_failed': 0, @@ -592,41 +700,43 @@ def generate_distribution_report(distribution_csv, send_result_csv, manifest_csv else: article_stats[entry_id]['sent_unknown'] += 1 - # 读取原始清单 - manifest_df = pd.read_csv(manifest_csv) - - # 创建带分发状态的清单 - manifest_with_dist = manifest_df.copy() - manifest_with_dist['assigned_count'] = manifest_with_dist['EntryID'].map( - {k: v['total_assigned'] for k, v in article_stats.items()}) - manifest_with_dist['sent_success'] = manifest_with_dist['EntryID'].map( - {k: v['sent_success'] for k, v in article_stats.items()}) - - # 填充NaN值 - manifest_with_dist['assigned_count'] = manifest_with_dist['assigned_count'].fillna(0).astype(int) - manifest_with_dist['sent_success'] = manifest_with_dist['sent_success'].fillna(0).astype(int) - - # 添加是否被分发的标记 - manifest_with_dist['is_distributed'] = manifest_with_dist['assigned_count'] > 0 - - # 保存报告 - report_csv = os.path.join(output_dir, f"distribution_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv") - manifest_with_dist.to_csv(report_csv, index=False) - # 创建简洁的分发摘要报告 summary_data = [] + cursor = conn.cursor() + for entry_id, stats in article_stats.items(): - summary_data.append({ - 'entry_id': entry_id, - 'product': stats['product'], - 'object': stats['object'], - 'date': stats['date'], - 'logic': stats['logic'], - 'assigned': stats['total_assigned'], - 'success': stats['sent_success'], - 'failed': stats['sent_failed'], - 'unknown': stats['sent_unknown'] - }) + # 从数据库获取内容详情 + cursor.execute("SELECT * FROM contents WHERE id = ?", (stats['content_id'],)) + content = cursor.fetchone() + + if content: + summary_data.append({ + 'entry_id': entry_id, + 'content_id': stats['content_id'], + 'product': stats['product'] or content['product'], + 'object': stats['object'] or content['object'], + 'date': stats['date'] or content['date'], + 'logic': stats['logic'] or content['logic'], + 'judge_status': content['judge_status'], + 'is_distributed': content['is_distributed'], + 'assigned': stats['total_assigned'], + 'success': stats['sent_success'], + 'failed': stats['sent_failed'], + 'unknown': stats['sent_unknown'] + }) + else: + summary_data.append({ + 'entry_id': entry_id, + 'content_id': stats['content_id'], + 'product': stats['product'], + 'object': stats['object'], + 'date': stats['date'], + 'logic': stats['logic'], + 'assigned': stats['total_assigned'], + 'success': stats['sent_success'], + 'failed': stats['sent_failed'], + 'unknown': stats['sent_unknown'] + }) if summary_data: summary_df = pd.DataFrame(summary_data) @@ -640,29 +750,40 @@ def generate_distribution_report(distribution_csv, send_result_csv, manifest_csv json.dump(article_stats, f, indent=2) # 统计摘要 - total_articles = len(manifest_df) - distributed_articles = sum(1 for _, v in article_stats.items() if v['total_assigned'] > 0) + cursor.execute("SELECT COUNT(*) FROM contents") + total_articles = cursor.fetchone()[0] + + cursor.execute("SELECT COUNT(*) FROM contents WHERE is_distributed = 1") + distributed_articles = cursor.fetchone()[0] + success_sent = sum(1 for _, v in article_stats.items() if v['sent_success'] > 0) - logger.info(f"分发报告已保存到: {report_csv}") logger.info(f"文章统计已保存到: {stats_json}") logger.info("\n===== 分发统计 =====") logger.info(f"总文章数: {total_articles}") logger.info(f"已分配文章数: {distributed_articles} ({distributed_articles/total_articles*100:.2f}%)") - logger.info(f"成功发送文章数: {success_sent} ({success_sent/total_articles*100:.2f}%)") + logger.info(f"本次成功发送文章数: {success_sent}") # 按产品统计 - if 'Product' in manifest_df.columns: - product_stats = manifest_with_dist.groupby('Product')['is_distributed'].value_counts().unstack().fillna(0) - if not product_stats.empty: - logger.info("\n===== 按产品分发统计 =====") - for product, row in product_stats.iterrows(): - if True in row: - distributed = row.get(True, 0) - total = row.sum() - logger.info(f"产品 '{product}': {distributed}/{total} ({distributed/total*100:.2f}%)") + cursor.execute(""" + SELECT product, + COUNT(*) as total, + SUM(CASE WHEN is_distributed = 1 THEN 1 ELSE 0 END) as distributed + FROM contents + GROUP BY product + """) + product_stats = cursor.fetchall() - return report_csv + if product_stats: + logger.info("\n===== 按产品分发统计 =====") + for row in product_stats: + product = row['product'] + total = row['total'] + distributed = row['distributed'] + if total > 0: + logger.info(f"产品 '{product}': {distributed}/{total} ({distributed/total*100:.2f}%)") + + return stats_json except Exception as e: logger.error(f"生成分发报告失败: {e}") return None @@ -673,51 +794,73 @@ def main(): # 创建输出目录 os.makedirs(args.output_dir, exist_ok=True) - # 读取用户CSV - users_df = read_user_csv(args.user_csv, args.email_column, args.username_column) - if users_df is None: - logger.error("无法处理用户CSV,程序退出") + # 连接数据库 + conn = create_database_connection(args.db_path) + if not conn: + logger.error("无法连接到数据库,程序退出") return - # 读取内容清单CSV - content_df = read_manifest_csv(args.manifest_csv, args.judge_only_success) - if content_df is None: - logger.error("无法处理内容清单CSV,程序退出") - return - - # 读取上一次分发结果(如果提供) - already_sent_ids = None - if args.previous_distribution: - already_sent_ids = read_previous_distribution(args.previous_distribution, args.skip_sent_success) - - # 为用户分配内容 - users_with_content = allocate_content_to_users( - users_df, content_df, args.article_per_user, - args.email_column, args.username_column, args.max_send_count, - already_sent_ids - ) - - if not users_with_content: - logger.error("内容分配失败,程序退出") - return - - # 准备分发CSV - distribution_csv = prepare_distribution_csv(users_with_content, args.output_dir) - if not distribution_csv: - logger.error("准备分发CSV失败,程序退出") - return - - # 发送邮件 - send_result_csv = send_emails( - distribution_csv, args.output_dir, args.email_from, args.email_password, - args.subject, args.smtp_server, args.smtp_port, args.use_ssl, args.test_mode, - args.delay, args.zip_filename - ) - - # 生成分发报告 - generate_distribution_report(distribution_csv, send_result_csv, args.manifest_csv, args.output_dir) - - logger.info("内容分发流程完成") + try: + # 从数据库查询内容 + content_df = query_contents_from_database( + conn, + product=args.product, + object=args.object, + judge_only_success=args.judge_only_success, + undistributed_only=args.undistributed_only + ) + + if content_df is None or len(content_df) == 0: + logger.error("没有找到符合条件的内容,程序退出") + return + + # 从数据库查询用户 + users_df = query_users_from_database( + conn, + user_id=args.user_id, + user_email=args.user_email, + max_users=args.max_users + ) + + if users_df is None or len(users_df) == 0: + logger.error("没有找到符合条件的用户,程序退出") + return + + # 为用户分配内容 + users_with_content = allocate_content_to_users( + users_df, content_df, args.article_per_user + ) + + if not users_with_content: + logger.error("内容分配失败,程序退出") + return + + # 准备分发CSV + distribution_csv = prepare_distribution_csv(users_with_content, args.output_dir) + if not distribution_csv: + logger.error("准备分发CSV失败,程序退出") + return + + # 发送邮件 + send_result_csv = send_emails( + distribution_csv, args.output_dir, args.email_from, args.email_password, + args.subject, args.smtp_server, args.smtp_port, args.use_ssl, args.test_mode, + args.delay, args.zip_filename + ) + + # 记录分发结果到数据库 + if not args.test_mode: + record_distribution_to_database(conn, distribution_csv, send_result_csv) + else: + logger.info("测试模式,不记录分发结果到数据库") + + # 生成分发报告 + generate_distribution_report(conn, distribution_csv, send_result_csv, args.output_dir) + + logger.info("内容分发流程完成") + + finally: + conn.close() if __name__ == "__main__": - main() \ No newline at end of file + main() \ No newline at end of file diff --git a/scripts/distribution/distribute_content_db.py b/scripts/distribution/distribute_content_db.py new file mode 100644 index 0000000..e69de29 diff --git a/scripts/distribution/distribute_example.sh b/scripts/distribution/distribute_example.sh index 8962e1c..fccae78 100755 --- a/scripts/distribution/distribute_example.sh +++ b/scripts/distribution/distribute_example.sh @@ -1,6 +1,7 @@ #!/bin/bash +# 内容分发系统数据库版启动脚本(完全从数据库获取数据) -# 设置时间戳变量 +# 设置默认时间戳变量 TIMESTAMP=$(date +"%Y%m%d_%H%M%S") # 设置路径变量 @@ -8,72 +9,195 @@ BASE_DIR="/root/autodl-tmp/TravelContentCreator" LOG_DIR="$BASE_DIR/log" RESULT_DIR="$BASE_DIR/output/$TIMESTAMP" OUTPUT_DIR="$RESULT_DIR/distribution_results" -# 设置其他变量 -USER_CSV="$BASE_DIR/output/5.12.csv" -MANIFEST_CSV="$BASE_DIR/output/2025-05-12_18-30-15/manifest_2025-05-12_18-30-15.csv" +DB_PATH="$BASE_DIR/distribution.db" + +# 设置邮件相关变量 EMAIL_FROM="zwysendemail@163.com" EMAIL_PASSWORD="NMhVGFmCJkGEy3B5" -SUBJECT="文旅小红书带货笔记内容0512" -# 上一次分发结果文件(如果存在) -PREVIOUS_DIST="" -# 压缩包文件名 -ZIP_FILENAME="文旅小红书带货笔记内容0512" +SUBJECT="文旅小红书带货笔记内容0513" +ZIP_FILENAME="文旅小红书带货笔记内容0513" + +# 设置分发配置 +ARTICLE_PER_USER=1 +MAX_USERS=10 # 最多发送给多少用户,不限制则设置为-1 +TEST_MODE=true # 测试模式,不实际发送邮件 +JUDGE_ONLY=true # 只分发审核通过的内容 +UNDISTRIBUTED_ONLY=true # 只分发未分发的内容 + +# 内容筛选配置 +TARGET_PRODUCT="【奇妙萌可卡牌套票】四季梦幻亲子乐园单人票" # 为空则不筛选特定产品 +TARGET_OBJECT="" # 为空则不筛选特定景点 + +# 用户筛选配置 +TARGET_USER_ID="" # 为空则不筛选特定用户ID +TARGET_USER_EMAIL="" # 为空则不筛选特定用户邮箱 # 创建必要的目录 mkdir -p "$LOG_DIR" mkdir -p "$OUTPUT_DIR" # 将日志同时输出到控制台和日志文件 -LOG_FILE="$LOG_DIR/distribution_$TIMESTAMP.log" +LOG_FILE="$LOG_DIR/distribution_db_$TIMESTAMP.log" exec > >(tee -a "$LOG_FILE") 2>&1 -echo "开始执行分发脚本 - $(date)" +echo "===================================================" +echo "内容分发系统启动 (数据库全依赖版) - $(date)" +echo "===================================================" echo "日志保存在: $LOG_FILE" echo "结果保存在: $RESULT_DIR" -# 测试模式运行 -echo "在测试模式下运行,不会实际发送邮件..." -python scripts/distribute_content.py \ - --user-csv "$USER_CSV" \ - --manifest-csv "$MANIFEST_CSV" \ - --output-dir "$OUTPUT_DIR" \ - --email-from "$EMAIL_FROM" \ - --email-password "$EMAIL_PASSWORD" \ - --subject "$SUBJECT" \ - --article-per-user 1 \ - --judge-only-success \ - --previous-distribution "$PREVIOUS_DIST" \ - --skip-sent-success \ - --zip-filename "$ZIP_FILENAME" +# 检查数据库是否存在 +if [ ! -f "$DB_PATH" ]; then + echo "数据库文件不存在: $DB_PATH" + echo "请先初始化数据库或检查路径是否正确" + exit 1 +fi -# 实际发送邮件的命令(取消注释以启用) -# echo "开始实际发送邮件..." -# python scripts/distribute_content.py \ -# --user-csv "$USER_CSV" \ -# --manifest-csv "$MANIFEST_CSV" \ -# --output-dir "$OUTPUT_DIR" \ -# --email-from "$EMAIL_FROM" \ -# --email-password "$EMAIL_PASSWORD" \ -# --subject "$SUBJECT" \ -# --article-per-user 3 \ -# --use-ssl \ -# --smtp-port 465 \ -# --judge-only-success \ -# --max-send-count 10 \ # 限制最多发送给10个用户 -# --previous-distribution "$PREVIOUS_DIST" \ -# --skip-sent-success \ -# --zip-filename "$ZIP_FILENAME" +# 检查内容表是否有记录 +CONTENT_COUNT=$(sqlite3 "$DB_PATH" "SELECT COUNT(*) FROM contents;") +if [ "$CONTENT_COUNT" -eq 0 ]; then + echo "警告: 内容表中没有记录,请先导入内容" + exit 1 +fi -# 不使用过滤功能的示例 -# python scripts/distribute_content.py \ -# --user-csv "$USER_CSV" \ -# --manifest-csv "$MANIFEST_CSV" \ -# --output-dir "$OUTPUT_DIR" \ -# --email-from "$EMAIL_FROM" \ -# --email-password "$EMAIL_PASSWORD" \ -# --subject "$SUBJECT" \ -# --article-per-user 3 \ -# --judge-only-success \ -# --zip-filename "$ZIP_FILENAME" +# 检查用户表是否有记录 +USER_COUNT=$(sqlite3 "$DB_PATH" "SELECT COUNT(*) FROM users;") +if [ "$USER_COUNT" -eq 0 ]; then + echo "警告: 用户表中没有记录,请先导入用户" + exit 1 +fi -echo "脚本执行完成 - $(date)" \ No newline at end of file +# 数据库状态统计 +echo "数据库状态统计:" +sqlite3 "$DB_PATH" <