From b2e8ec3bec843d56de0df8283322934e4c48b8ff Mon Sep 17 00:00:00 2001 From: jinye_huang Date: Tue, 13 May 2025 09:34:59 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=86=E5=8F=91=E6=A8=A1=E5=9D=97=20?= =?UTF-8?q?=E5=9F=BA=E6=9C=AC=E6=88=90=E7=86=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scripts/distribution/distribute_content.py | 723 +++++++++++++++++++++ scripts/distribution/distribute_example.sh | 79 +++ scripts/distribution/extract_and_render.py | 383 +++++++++++ 3 files changed, 1185 insertions(+) create mode 100644 scripts/distribution/distribute_content.py create mode 100755 scripts/distribution/distribute_example.sh create mode 100644 scripts/distribution/extract_and_render.py diff --git a/scripts/distribution/distribute_content.py b/scripts/distribution/distribute_content.py new file mode 100644 index 0000000..fac3566 --- /dev/null +++ b/scripts/distribution/distribute_content.py @@ -0,0 +1,723 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import os +import sys +import pandas as pd +import argparse +import random +import logging +import subprocess +from datetime import datetime +import json +import smtplib +import zipfile +import time +import glob +from email.mime.text import MIMEText +from email.mime.multipart import MIMEMultipart +from email.mime.application import MIMEApplication +from email.header import Header + +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler(f"content_distribution_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"), + logging.StreamHandler() + ] +) +logger = logging.getLogger(__name__) + +def parse_arguments(): + """解析命令行参数""" + parser = argparse.ArgumentParser(description='内容分发') + + # 必需参数 + parser.add_argument('--user-csv', type=str, required=True, + help='小红书用户CSV文件路径') + parser.add_argument('--manifest-csv', type=str, required=True, + help='内容清单CSV文件路径') + parser.add_argument('--output-dir', type=str, default='distribution_results', + help='分发结果输出目录') + parser.add_argument('--email-from', type=str, required=True, + help='发件人邮箱地址') + parser.add_argument('--email-password', type=str, required=True, + help='发件人邮箱授权码') + + # 可选参数 + parser.add_argument('--article-per-user', type=int, default=3, + help='每个用户分配的文章数量') + parser.add_argument('--max-send-count', type=int, default=None, + help='最大发送数量限制') + parser.add_argument('--subject', type=str, default='您的旅游内容创作已就绪', + help='邮件主题') + parser.add_argument('--smtp-server', type=str, default='smtp.163.com', + help='SMTP服务器地址') + parser.add_argument('--smtp-port', type=int, default=25, + help='SMTP服务器端口') + parser.add_argument('--use-ssl', action='store_true', + help='使用SSL连接SMTP服务器') + parser.add_argument('--email-column', type=str, default='达人邮箱', + help='用户CSV中邮箱列的名称') + parser.add_argument('--username-column', type=str, default='小红书ID', + help='用户CSV中用户名列的名称') + parser.add_argument('--judge-only-success', action='store_true', + help='只分发审核成功的内容') + parser.add_argument('--test-mode', action='store_true', + help='测试模式,不实际发送邮件') + parser.add_argument('--delay', type=int, default=2, + help='每封邮件发送之间的延迟时间(秒)') + parser.add_argument('--previous-distribution', type=str, default=None, + help='上一次分发结果CSV或报告文件路径,用于避免重复发送') + parser.add_argument('--skip-sent-success', action='store_true', + help='跳过上次成功发送的文章') + parser.add_argument('--zip-filename', type=str, default=None, + help='指定ZIP压缩包的基本文件名(不含扩展名),如"文旅小红书带货笔记内容0512"') + + return parser.parse_args() + +def find_additional_images(file_path): + """查找与文章相关的所有额外图片""" + if not file_path or pd.isna(file_path) or not os.path.exists(file_path): + return [] + + # 获取文章目录 + article_dir = os.path.dirname(file_path) + + # 查找可能的额外图片目录 + additional_images_dir = os.path.join(article_dir, "additional_images") + if not os.path.exists(additional_images_dir): + # 尝试向上一级寻找 + parent_dir = os.path.dirname(article_dir) + additional_images_dir = os.path.join(parent_dir, "additional_images") + if not os.path.exists(additional_images_dir): + return [] + + # 查找所有图片文件 + image_patterns = ["*.jpg", "*.jpeg", "*.png", "*.gif", "*.webp"] + image_files = [] + + for pattern in image_patterns: + image_files.extend(glob.glob(os.path.join(additional_images_dir, pattern))) + + logger.info(f"找到 {len(image_files)} 张额外图片: {additional_images_dir}") + return image_files + +def read_user_csv(user_csv_path, email_column, username_column): + """读取用户CSV文件""" + try: + df = pd.read_csv(user_csv_path) + + # 检查必要的列是否存在 + if email_column not in df.columns: + logger.error(f"用户CSV中缺少邮箱列 '{email_column}'") + return None + + # 过滤有效邮箱(非空且包含@符号) + df = df[df[email_column].notna()] + df = df[df[email_column].astype(str).str.contains('@')] + + # 获取用户名列,如果不存在则创建默认用户名 + if username_column not in df.columns: + logger.warning(f"用户CSV中缺少用户名列 '{username_column}',使用邮箱前缀作为用户名") + df[username_column] = df[email_column].apply(lambda x: x.split('@')[0]) + + logger.info(f"成功读取 {len(df)} 个有效用户") + return df + except Exception as e: + logger.error(f"读取用户CSV失败: {e}") + return None + +def read_manifest_csv(manifest_csv_path, judge_only_success=False): + """读取内容清单CSV文件""" + try: + df = pd.read_csv(manifest_csv_path) + + # 检查必要的列是否存在 + required_columns = ['OutputTxtPath', 'PosterPath', 'ArticleJsonPath'] + missing_columns = [col for col in required_columns if col not in df.columns] + if missing_columns: + logger.error(f"内容清单CSV中缺少必要列: {', '.join(missing_columns)}") + return None + + # 过滤审核成功的内容 + if judge_only_success and 'JudgeStatus' in df.columns: + original_count = len(df) + df = df[df['JudgeStatus'] == True] + logger.info(f"只保留审核成功的内容,从 {original_count} 条过滤为 {len(df)} 条") + + logger.info(f"成功读取 {len(df)} 条内容") + return df + except Exception as e: + logger.error(f"读取内容清单CSV失败: {e}") + return None + +def read_previous_distribution(previous_file, skip_sent_success=True): + """读取上一次的分发结果,获取已发放过的文章ID""" + if not previous_file or not os.path.exists(previous_file): + logger.info("未提供上一次分发结果文件或文件不存在,将不过滤已发放文章") + return set() + + try: + df = pd.read_csv(previous_file) + + # 确定文件类型并提取相关列 + if 'entry_id' in df.columns: # distribution.csv 或 email_results.csv + if 'send_status' in df.columns and skip_sent_success: + # 只过滤成功发送的 + already_sent = df[df['send_status'].str.contains('success', na=False)]['entry_id'].unique() + else: + # 过滤所有分配过的 + already_sent = df['entry_id'].unique() + elif 'EntryID' in df.columns: # manifest_with_dist.csv + if 'sent_success' in df.columns and skip_sent_success: + # 只过滤成功发送的 + already_sent = df[df['sent_success'] > 0]['EntryID'].unique() + else: + # 过滤所有分配过的 + already_sent = df[df['assigned_count'] > 0]['EntryID'].unique() + elif 'entry_id' in df.columns: # distribution_summary.csv + if 'success' in df.columns and skip_sent_success: + # 只过滤成功发送的 + already_sent = df[df['success'] > 0]['entry_id'].unique() + else: + # 过滤所有分配过的 + already_sent = df['entry_id'].unique() + else: + logger.warning(f"无法识别的分发结果文件格式: {previous_file}") + return set() + + already_sent_set = set(already_sent) + logger.info(f"从上一次分发结果中找到 {len(already_sent_set)} 篇已发放文章") + return already_sent_set + + except Exception as e: + logger.error(f"读取上一次分发结果失败: {e}") + return set() + +def allocate_content_to_users(users_df, content_df, article_per_user, + email_column, username_column, max_send_count=None, + already_sent_ids=None): + """为用户分配内容""" + try: + # 创建用户列表 + users = [] + for _, row in users_df.iterrows(): + email = row[email_column] + username = row[username_column] if not pd.isna(row[username_column]) else f"用户_{_}" + users.append({ + 'email': email, + 'username': username, + 'contents': [] + }) + + # 转换为记录列表并过滤已发放的文章 + content_list = content_df.to_dict('records') + if already_sent_ids: + original_count = len(content_list) + content_list = [c for c in content_list if c['EntryID'] not in already_sent_ids] + filtered_count = original_count - len(content_list) + logger.info(f"过滤掉 {filtered_count} 篇已发放文章,剩余 {len(content_list)} 篇可用") + + if not content_list: + logger.warning("过滤后没有可用文章,请提供新内容或关闭过滤功能") + return [] + + # 随机打乱内容列表 + random.shuffle(content_list) + + # 为每个用户分配内容 + content_allocated = [] + content_index = 0 + + # 限制最大发送数量 + if max_send_count is not None and max_send_count > 0: + users = users[:max_send_count] + logger.info(f"限制发送用户数量为 {max_send_count}") + + for user in users: + user_contents = [] + for _ in range(article_per_user): + if content_index >= len(content_list): + content_index = 0 # 如果内容不够,循环使用 + logger.warning("内容不足,将循环使用现有内容") + + content = content_list[content_index] + user_contents.append(content) + content_allocated.append(content) + content_index += 1 + + user['contents'] = user_contents + + logger.info(f"已为 {len(users)} 个用户分配 {len(content_allocated)} 条内容") + unique_content_count = len(set([c['EntryID'] for c in content_allocated])) + logger.info(f"分配的唯一内容条数: {unique_content_count}") + + return users + except Exception as e: + logger.error(f"分配内容失败: {e}") + return [] + +def prepare_distribution_csv(users_with_content, output_dir): + """准备分发CSV文件""" + try: + os.makedirs(output_dir, exist_ok=True) + + # 准备CSV数据 + rows = [] + for user in users_with_content: + for content in user['contents']: + # 查找额外图片 + output_txt_path = content['OutputTxtPath'] + additional_images = find_additional_images(output_txt_path) + + rows.append({ + 'email': user['email'], + 'username': user['username'], + 'file_path': content['OutputTxtPath'], + 'poster_path': content['PosterPath'], + 'article_json_path': content['ArticleJsonPath'], + 'additional_images': ';'.join(additional_images), # 保存为分号分隔的字符串 + 'entry_id': content['EntryID'], + 'topic_index': content.get('TopicIndex', ''), + 'variant_index': content.get('VariantIndex', ''), + 'product': content.get('Product', ''), + 'object': content.get('Object', ''), + 'date': content.get('Date', ''), + 'logic': content.get('Logic', '') + }) + + # 创建DataFrame并保存 + df = pd.DataFrame(rows) + distribution_csv = os.path.join(output_dir, f"distribution_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv") + df.to_csv(distribution_csv, index=False) + + logger.info(f"分发CSV已保存到: {distribution_csv}") + return distribution_csv + except Exception as e: + logger.error(f"准备分发CSV失败: {e}") + return None + +def create_zip_file(files, output_path): + """将文件打包为ZIP""" + try: + os.makedirs(os.path.dirname(output_path), exist_ok=True) + with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zipf: + for file_path in files: + if not file_path or pd.isna(file_path): + continue + + if os.path.exists(file_path): + arcname = os.path.basename(file_path) + zipf.write(file_path, arcname=arcname) + else: + logger.warning(f"文件不存在,跳过: {file_path}") + + logger.info(f"成功创建ZIP文件: {output_path}") + return output_path + except Exception as e: + logger.error(f"创建ZIP文件失败: {e}") + return None + +def send_single_email(from_addr, password, to_addr, subject, content, attachments, + smtp_server, smtp_port, use_ssl=False): + """发送单封邮件""" + try: + # 创建邮件对象 + message = MIMEMultipart() + message['From'] = Header(from_addr) + message['To'] = Header(to_addr) + message['Subject'] = Header(subject) + + # 添加正文 + message.attach(MIMEText(content, 'plain', 'utf-8')) + + # 添加附件 + for attachment in attachments: + if not os.path.exists(attachment): + logger.warning(f"附件不存在,跳过: {attachment}") + continue + + with open(attachment, 'rb') as f: + part = MIMEApplication(f.read()) + part.add_header('Content-Disposition', 'attachment', + filename=Header(os.path.basename(attachment), 'utf-8').encode()) + message.attach(part) + + # 连接SMTP服务器并发送 + if use_ssl: + server = smtplib.SMTP_SSL(smtp_server, smtp_port) + else: + server = smtplib.SMTP(smtp_server, smtp_port) + + server.login(from_addr, password) + server.sendmail(from_addr, to_addr, message.as_string()) + server.quit() + + logger.info(f"成功发送邮件到: {to_addr}") + return True + except Exception as e: + logger.error(f"发送邮件到 {to_addr} 失败: {e}") + return False + +def send_emails(distribution_csv, output_dir, email_from, email_password, + subject, smtp_server, smtp_port, use_ssl, test_mode, delay=2, zip_filename=None): + """发送邮件""" + try: + # 确保输出目录存在 + os.makedirs(output_dir, exist_ok=True) + os.makedirs(os.path.join(output_dir, "temp_zips"), exist_ok=True) + + # 读取分发CSV + df = pd.read_csv(distribution_csv) + logger.info(f"从 {distribution_csv} 读取了 {len(df)} 条分发记录") + + # 按邮箱分组 + email_groups = {} + for _, row in df.iterrows(): + email = row['email'] + if email not in email_groups: + email_groups[email] = [] + email_groups[email].append(row) + + logger.info(f"共有 {len(email_groups)} 个邮箱需要发送") + + # 结果记录 + results = { + "total": len(email_groups), + "success": 0, + "failed": 0, + "details": [] + } + + # 临时ZIP文件列表 + temp_zips = [] + + # 发送邮件 + for email, rows in email_groups.items(): + try: + logger.info(f"处理邮箱: {email}") + + # 收集所有文件路径 + files = [] + for row in rows: + # 收集文本文件 + file_path = row.get('file_path') + if file_path and not pd.isna(file_path) and os.path.exists(file_path): + files.append(file_path) + + # 收集海报图片 + poster_path = row.get('poster_path') + if poster_path and not pd.isna(poster_path) and os.path.exists(poster_path): + files.append(poster_path) + + # 收集所有额外图片 + additional_images = row.get('additional_images', '') + if additional_images and not pd.isna(additional_images): + for img_path in additional_images.split(';'): + if img_path.strip() and os.path.exists(img_path): + files.append(img_path) + + if not files: + logger.warning(f"邮箱 {email} 没有有效的附件文件,跳过") + results["details"].append({ + "email": email, + "status": "skipped", + "reason": "没有有效的附件文件", + "files": [] + }) + results["failed"] += 1 + continue + + # 创建ZIP文件 + if zip_filename: + # 使用指定的文件名 + zip_filename_with_ext = f"{zip_filename}.zip" + else: + # 使用默认的时间戳文件名 + zip_filename_with_ext = f"content_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{hash(email) % 10000}.zip" + + zip_path = os.path.join(output_dir, "temp_zips", zip_filename_with_ext) + + zip_file = create_zip_file(files, zip_path) + if zip_file: + temp_zips.append(zip_file) + else: + logger.error(f"为邮箱 {email} 创建ZIP文件失败,跳过") + results["details"].append({ + "email": email, + "status": "failed", + "reason": "创建ZIP文件失败", + "files": files + }) + results["failed"] += 1 + continue + + # 构建邮件内容 + files_count = len(files) + entries_count = len(rows) + + email_content = f"""您好!请查收今日带货笔记(文案+配图),内容在文件压缩包内。具体挂载商品等操作流程请查看对应达人微信群内信息。 + +""" + + # 发送邮件 + if test_mode: + logger.info(f"测试模式: 模拟发送邮件到 {email}") + results["details"].append({ + "email": email, + "status": "success (test mode)", + "files": files + }) + results["success"] += 1 + else: + success = send_single_email( + email_from, + email_password, + email, + subject, + email_content, + [zip_file], + smtp_server, + smtp_port, + use_ssl + ) + + if success: + results["success"] += 1 + results["details"].append({ + "email": email, + "status": "success", + "files": files + }) + else: + results["failed"] += 1 + results["details"].append({ + "email": email, + "status": "failed", + "reason": "SMTP发送失败", + "files": files + }) + + # 添加延迟,避免发送过快 + if not test_mode and delay > 0: + logger.info(f"等待 {delay} 秒后继续发送下一封邮件...") + time.sleep(delay) + + except Exception as e: + logger.error(f"处理邮箱 {email} 时出错: {e}") + results["failed"] += 1 + results["details"].append({ + "email": email, + "status": "failed", + "reason": str(e), + "files": [] + }) + + # 打印发送结果摘要 + logger.info("\n===== 邮件发送结果摘要 =====") + logger.info(f"总计: {results['total']} 个邮箱") + logger.info(f"成功: {results['success']} 个邮箱") + logger.info(f"失败: {results['failed']} 个邮箱") + + # 清理临时文件 + if not test_mode: + logger.info("清理临时ZIP文件...") + for temp_zip in temp_zips: + try: + if os.path.exists(temp_zip): + os.remove(temp_zip) + logger.info(f"已删除临时文件: {temp_zip}") + except Exception as e: + logger.warning(f"删除临时文件失败: {temp_zip}, 错误: {e}") + + # 将结果写入CSV文件 + try: + result_df = pd.DataFrame(results["details"]) + result_csv = os.path.join(output_dir, + f"email_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv") + result_df.to_csv(result_csv, index=False) + logger.info(f"发送结果已保存到: {result_csv}") + return result_csv + except Exception as e: + logger.error(f"保存结果到CSV失败: {e}") + return None + + except Exception as e: + logger.error(f"发送邮件过程中发生错误: {e}") + return None + +def generate_distribution_report(distribution_csv, send_result_csv, manifest_csv, output_dir): + """生成分发报告""" + try: + # 读取分发CSV + dist_df = pd.read_csv(distribution_csv) + + # 读取发送结果CSV + if send_result_csv and os.path.exists(send_result_csv): + result_df = pd.read_csv(send_result_csv) + # 添加发送状态 + email_status = {} + for _, row in result_df.iterrows(): + email_status[row['email']] = row['status'] + + dist_df['send_status'] = dist_df['email'].map(email_status) + else: + dist_df['send_status'] = 'unknown' + + # 统计文章分发情况 + article_stats = {} + for _, row in dist_df.iterrows(): + entry_id = row['entry_id'] + if entry_id not in article_stats: + article_stats[entry_id] = { + 'total_assigned': 0, + 'sent_success': 0, + 'sent_failed': 0, + 'sent_unknown': 0, + 'product': row.get('product', ''), + 'object': row.get('object', ''), + 'date': row.get('date', ''), + 'logic': row.get('logic', '') + } + + article_stats[entry_id]['total_assigned'] += 1 + + if 'success' in str(row['send_status']).lower(): + article_stats[entry_id]['sent_success'] += 1 + elif row['send_status'] in ['failed', 'skipped']: + article_stats[entry_id]['sent_failed'] += 1 + else: + article_stats[entry_id]['sent_unknown'] += 1 + + # 读取原始清单 + manifest_df = pd.read_csv(manifest_csv) + + # 创建带分发状态的清单 + manifest_with_dist = manifest_df.copy() + manifest_with_dist['assigned_count'] = manifest_with_dist['EntryID'].map( + {k: v['total_assigned'] for k, v in article_stats.items()}) + manifest_with_dist['sent_success'] = manifest_with_dist['EntryID'].map( + {k: v['sent_success'] for k, v in article_stats.items()}) + + # 填充NaN值 + manifest_with_dist['assigned_count'] = manifest_with_dist['assigned_count'].fillna(0).astype(int) + manifest_with_dist['sent_success'] = manifest_with_dist['sent_success'].fillna(0).astype(int) + + # 添加是否被分发的标记 + manifest_with_dist['is_distributed'] = manifest_with_dist['assigned_count'] > 0 + + # 保存报告 + report_csv = os.path.join(output_dir, f"distribution_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv") + manifest_with_dist.to_csv(report_csv, index=False) + + # 创建简洁的分发摘要报告 + summary_data = [] + for entry_id, stats in article_stats.items(): + summary_data.append({ + 'entry_id': entry_id, + 'product': stats['product'], + 'object': stats['object'], + 'date': stats['date'], + 'logic': stats['logic'], + 'assigned': stats['total_assigned'], + 'success': stats['sent_success'], + 'failed': stats['sent_failed'], + 'unknown': stats['sent_unknown'] + }) + + if summary_data: + summary_df = pd.DataFrame(summary_data) + summary_csv = os.path.join(output_dir, f"distribution_summary_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv") + summary_df.to_csv(summary_csv, index=False) + logger.info(f"分发摘要报告已保存到: {summary_csv}") + + # 保存文章统计 + stats_json = os.path.join(output_dir, f"article_stats_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json") + with open(stats_json, 'w') as f: + json.dump(article_stats, f, indent=2) + + # 统计摘要 + total_articles = len(manifest_df) + distributed_articles = sum(1 for _, v in article_stats.items() if v['total_assigned'] > 0) + success_sent = sum(1 for _, v in article_stats.items() if v['sent_success'] > 0) + + logger.info(f"分发报告已保存到: {report_csv}") + logger.info(f"文章统计已保存到: {stats_json}") + logger.info("\n===== 分发统计 =====") + logger.info(f"总文章数: {total_articles}") + logger.info(f"已分配文章数: {distributed_articles} ({distributed_articles/total_articles*100:.2f}%)") + logger.info(f"成功发送文章数: {success_sent} ({success_sent/total_articles*100:.2f}%)") + + # 按产品统计 + if 'Product' in manifest_df.columns: + product_stats = manifest_with_dist.groupby('Product')['is_distributed'].value_counts().unstack().fillna(0) + if not product_stats.empty: + logger.info("\n===== 按产品分发统计 =====") + for product, row in product_stats.iterrows(): + if True in row: + distributed = row.get(True, 0) + total = row.sum() + logger.info(f"产品 '{product}': {distributed}/{total} ({distributed/total*100:.2f}%)") + + return report_csv + except Exception as e: + logger.error(f"生成分发报告失败: {e}") + return None + +def main(): + args = parse_arguments() + + # 创建输出目录 + os.makedirs(args.output_dir, exist_ok=True) + + # 读取用户CSV + users_df = read_user_csv(args.user_csv, args.email_column, args.username_column) + if users_df is None: + logger.error("无法处理用户CSV,程序退出") + return + + # 读取内容清单CSV + content_df = read_manifest_csv(args.manifest_csv, args.judge_only_success) + if content_df is None: + logger.error("无法处理内容清单CSV,程序退出") + return + + # 读取上一次分发结果(如果提供) + already_sent_ids = None + if args.previous_distribution: + already_sent_ids = read_previous_distribution(args.previous_distribution, args.skip_sent_success) + + # 为用户分配内容 + users_with_content = allocate_content_to_users( + users_df, content_df, args.article_per_user, + args.email_column, args.username_column, args.max_send_count, + already_sent_ids + ) + + if not users_with_content: + logger.error("内容分配失败,程序退出") + return + + # 准备分发CSV + distribution_csv = prepare_distribution_csv(users_with_content, args.output_dir) + if not distribution_csv: + logger.error("准备分发CSV失败,程序退出") + return + + # 发送邮件 + send_result_csv = send_emails( + distribution_csv, args.output_dir, args.email_from, args.email_password, + args.subject, args.smtp_server, args.smtp_port, args.use_ssl, args.test_mode, + args.delay, args.zip_filename + ) + + # 生成分发报告 + generate_distribution_report(distribution_csv, send_result_csv, args.manifest_csv, args.output_dir) + + logger.info("内容分发流程完成") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/scripts/distribution/distribute_example.sh b/scripts/distribution/distribute_example.sh new file mode 100755 index 0000000..8962e1c --- /dev/null +++ b/scripts/distribution/distribute_example.sh @@ -0,0 +1,79 @@ +#!/bin/bash + +# 设置时间戳变量 +TIMESTAMP=$(date +"%Y%m%d_%H%M%S") + +# 设置路径变量 +BASE_DIR="/root/autodl-tmp/TravelContentCreator" +LOG_DIR="$BASE_DIR/log" +RESULT_DIR="$BASE_DIR/output/$TIMESTAMP" +OUTPUT_DIR="$RESULT_DIR/distribution_results" +# 设置其他变量 +USER_CSV="$BASE_DIR/output/5.12.csv" +MANIFEST_CSV="$BASE_DIR/output/2025-05-12_18-30-15/manifest_2025-05-12_18-30-15.csv" +EMAIL_FROM="zwysendemail@163.com" +EMAIL_PASSWORD="NMhVGFmCJkGEy3B5" +SUBJECT="文旅小红书带货笔记内容0512" +# 上一次分发结果文件(如果存在) +PREVIOUS_DIST="" +# 压缩包文件名 +ZIP_FILENAME="文旅小红书带货笔记内容0512" + +# 创建必要的目录 +mkdir -p "$LOG_DIR" +mkdir -p "$OUTPUT_DIR" + +# 将日志同时输出到控制台和日志文件 +LOG_FILE="$LOG_DIR/distribution_$TIMESTAMP.log" +exec > >(tee -a "$LOG_FILE") 2>&1 + +echo "开始执行分发脚本 - $(date)" +echo "日志保存在: $LOG_FILE" +echo "结果保存在: $RESULT_DIR" + +# 测试模式运行 +echo "在测试模式下运行,不会实际发送邮件..." +python scripts/distribute_content.py \ + --user-csv "$USER_CSV" \ + --manifest-csv "$MANIFEST_CSV" \ + --output-dir "$OUTPUT_DIR" \ + --email-from "$EMAIL_FROM" \ + --email-password "$EMAIL_PASSWORD" \ + --subject "$SUBJECT" \ + --article-per-user 1 \ + --judge-only-success \ + --previous-distribution "$PREVIOUS_DIST" \ + --skip-sent-success \ + --zip-filename "$ZIP_FILENAME" + +# 实际发送邮件的命令(取消注释以启用) +# echo "开始实际发送邮件..." +# python scripts/distribute_content.py \ +# --user-csv "$USER_CSV" \ +# --manifest-csv "$MANIFEST_CSV" \ +# --output-dir "$OUTPUT_DIR" \ +# --email-from "$EMAIL_FROM" \ +# --email-password "$EMAIL_PASSWORD" \ +# --subject "$SUBJECT" \ +# --article-per-user 3 \ +# --use-ssl \ +# --smtp-port 465 \ +# --judge-only-success \ +# --max-send-count 10 \ # 限制最多发送给10个用户 +# --previous-distribution "$PREVIOUS_DIST" \ +# --skip-sent-success \ +# --zip-filename "$ZIP_FILENAME" + +# 不使用过滤功能的示例 +# python scripts/distribute_content.py \ +# --user-csv "$USER_CSV" \ +# --manifest-csv "$MANIFEST_CSV" \ +# --output-dir "$OUTPUT_DIR" \ +# --email-from "$EMAIL_FROM" \ +# --email-password "$EMAIL_PASSWORD" \ +# --subject "$SUBJECT" \ +# --article-per-user 3 \ +# --judge-only-success \ +# --zip-filename "$ZIP_FILENAME" + +echo "脚本执行完成 - $(date)" \ No newline at end of file diff --git a/scripts/distribution/extract_and_render.py b/scripts/distribution/extract_and_render.py new file mode 100644 index 0000000..21f9092 --- /dev/null +++ b/scripts/distribution/extract_and_render.py @@ -0,0 +1,383 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import os +import json +import shutil +import csv +import traceback +import re +import argparse +from datetime import datetime + +def convert_json_to_txt_content(json_path, prefer_original=False): + """ + 读取 JSON 文件,提取标题、内容和标签,移除 Markdown 格式, + 并返回格式化文本。 + + 根据judge_success字段决定使用原始内容还是审核后内容: + - judge_success为True时使用title/content(除非prefer_original=True) + - judge_success为False时使用original_title/original_content + + Args: + json_path: JSON文件路径 + prefer_original: 是否优先使用原始内容,无视judge_success结果 + """ + print(f" - 正在读取 JSON: {json_path}") + if not os.path.exists(json_path): + print(f" - 警告: JSON 文件不存在: {json_path}") + return None, f"文件未找到: {json_path}" + + try: + with open(json_path, 'r', encoding='utf-8') as f_json: + data = json.load(f_json) + + # 根据judge_success选择标题和内容 + judge_success = data.get('judge_success', None) + + if prefer_original and 'original_title' in data and 'original_content' in data: + # 优先使用原始内容 + title = data.get('original_title', '未找到原始标题') + content = data.get('original_content', '未找到原始内容') + # 优先使用原始标签 + tags = data.get('original_tags', data.get('tags', '未找到标签')) + print(f" - 优先使用原始内容 (prefer_original=True)") + elif judge_success is True and not prefer_original: + # 使用审核后的内容 + title = data.get('title', '未找到标题') + content = data.get('content', '未找到内容') + tags = data.get('tags', '未找到标签') + print(f" - 使用审核后内容 (judge_success=True)") + elif 'original_title' in data and 'original_content' in data: + # 使用原始内容 + title = data.get('original_title', '未找到原始标题') + content = data.get('original_content', '未找到原始内容') + # 优先使用原始标签 + tags = data.get('original_tags', data.get('tags', '未找到标签')) + print(f" - 使用原始内容 (judge_success={judge_success})") + else: + # 若无original字段,使用常规字段 + title = data.get('title', '未找到标题') + content = data.get('content', '未找到内容') + tags = data.get('tags', '未找到标签') + print(f" - 使用常规内容 (无judge结果)") + + # 解决tag/tags字段重复问题,按照修正后的处理逻辑,只使用tags字段 + if not tags and 'tag' in data: + tags = data.get('tag', '未找到标签') + print(f" - 使用tag字段作为标签 (该字段将在后续版本中统一为tags)") + + # 移除Markdown格式 + content_no_format = re.sub(r'\*\*(.*?)\*\*', r'\1', content) + + # 组合输出文本 + return f"{title}\n\n{content_no_format}\n\n{tags}", None + except json.JSONDecodeError: + print(f" - 错误: JSON 格式无效: {json_path}") + return None, f"无效的 JSON 格式: {json_path}" + except Exception as e: + print(f" - 错误: 处理 JSON 时出错: {e}") + return None, f"处理 JSON 时出错: {e}" + +def load_topic_data(source_dir, run_id): + """ + 加载选题数据 + + Args: + source_dir: 源目录路径 + run_id: 运行ID + + Returns: + dict: 以topic_index为键的选题数据字典 + """ + topic_file_path = os.path.join(source_dir, f"tweet_topic_{run_id}.json") + topic_data = {} + + if os.path.exists(topic_file_path): + try: + with open(topic_file_path, 'r', encoding='utf-8') as f: + topics = json.load(f) + + # 将选题数据转换为以index为键的字典 + for topic in topics: + index = topic.get("index") + if index: + topic_data[index] = topic + + print(f"成功加载选题数据,共{len(topic_data)}条") + except Exception as e: + print(f"加载选题数据时出错: {e}") + else: + print(f"警告: 未找到选题文件: {topic_file_path}") + + return topic_data + +def process_result_directory(source_dir, output_dir, run_id=None, prefer_original=False): + """ + 处理指定的结果目录,提取内容并渲染到输出目录。 + + Args: + source_dir: 源目录路径,包含i_j子目录 + output_dir: 输出目录路径 + run_id: 可选的运行ID,如果不提供则使用源目录名 + prefer_original: 是否优先使用原始内容,无视judge_success结果 + """ + if not os.path.isdir(source_dir): + print(f"错误: 源目录不存在: {source_dir}") + return + + # 创建输出目录 + os.makedirs(output_dir, exist_ok=True) + print(f"确保输出目录存在: {output_dir}") + + # 提取run_id + if not run_id: + run_id = os.path.basename(source_dir) + + # 加载选题数据 + topic_data = load_topic_data(source_dir, run_id) + + # 创建CSV清单,添加选题相关字段 + csv_path = os.path.join(output_dir, f"manifest_{run_id}.csv") + csv_data = [ + [ + "EntryID", + "TopicIndex", + "VariantIndex", + "Date", + "Logic", + "Object", + "Product", + "ProductLogic", + "Style", + "StyleLogic", + "TargetAudience", + "TargetAudienceLogic", + "SourcePath", + "ArticleJsonPath", + "OutputTxtPath", + "PosterPath", + "AdditionalImagesCount", + "Status", + "Details", + "JudgeStatus", + "ContentSource" + ] + ] + + # 查找所有i_j目录 + entry_pattern = re.compile(r"^(\d+)_(\d+)$") + entries = [] + + for item in os.listdir(source_dir): + item_path = os.path.join(source_dir, item) + match = entry_pattern.match(item) + if os.path.isdir(item_path) and match: + entries.append(item) + + if not entries: + print(f"警告: 在源目录中未找到任何i_j格式的子目录") + return + + print(f"找到 {len(entries)} 个条目目录") + + # 处理每个条目 + for entry in sorted(entries): + entry_path = os.path.join(source_dir, entry) + output_entry_path = os.path.join(output_dir, entry) + + print(f"\n处理条目: {entry}") + + # 解析topic_index和variant_index + match = entry_pattern.match(entry) + topic_index = match.group(1) + variant_index = match.group(2) + + # 获取该话题的选题信息 + topic_info = topic_data.get(topic_index, {}) + + # 创建记录 + record = { + "EntryID": entry, + "TopicIndex": topic_index, + "VariantIndex": variant_index, + "Date": topic_info.get("date", ""), + "Logic": topic_info.get("logic", ""), + "Object": topic_info.get("object", ""), + "Product": topic_info.get("product", ""), + "ProductLogic": topic_info.get("product_logic", ""), + "Style": topic_info.get("style", ""), + "StyleLogic": topic_info.get("style_logic", ""), + "TargetAudience": topic_info.get("target_audience", ""), + "TargetAudienceLogic": topic_info.get("target_audience_logic", ""), + "SourcePath": entry_path, + "ArticleJsonPath": "", + "OutputTxtPath": "", + "PosterPath": "", + "AdditionalImagesCount": 0, + "Status": "Processing", + "Details": "", + "JudgeStatus": "", + "ContentSource": "unknown" + } + + # 创建输出条目目录 + try: + os.makedirs(output_entry_path, exist_ok=True) + except Exception as e: + record["Status"] = "Failed" + record["Details"] = f"创建输出目录失败: {e}" + csv_data.append([record[col] for col in csv_data[0]]) + print(f" - 错误: {record['Details']}") + continue + + # 1. 处理article.json -> txt + json_path = os.path.join(entry_path, "article.json") + txt_path = os.path.join(output_entry_path, "article.txt") + record["ArticleJsonPath"] = json_path + record["OutputTxtPath"] = txt_path + + if os.path.exists(json_path): + # 读取article.json + try: + with open(json_path, 'r', encoding='utf-8') as f_json: + article_data = json.load(f_json) + # 提取judge_success状态 + if "judge_success" in article_data: + record["JudgeStatus"] = str(article_data["judge_success"]) + elif "judged" in article_data: + record["JudgeStatus"] = "已审核" if article_data["judged"] else "未审核" + except Exception as e: + print(f" - 错误: 读取article.json失败: {e}") + + txt_content, error = convert_json_to_txt_content(json_path, prefer_original) + if error: + record["Status"] = "Partial" + record["Details"] += f"文章处理失败: {error}; " + print(f" - 错误: {record['Details']}") + else: + try: + with open(txt_path, 'w', encoding='utf-8') as f_txt: + f_txt.write(txt_content) + print(f" - 成功写入文本文件: {txt_path}") + + # 记录内容来源 + if prefer_original: + record["ContentSource"] = "original_preferred" + elif article_data.get("judge_success") is True: + record["ContentSource"] = "judged" + elif "original_title" in article_data: + record["ContentSource"] = "original" + else: + record["ContentSource"] = "default" + + except Exception as e: + record["Status"] = "Partial" + record["Details"] += f"写入文本文件失败: {e}; " + print(f" - 错误: {record['Details']}") + else: + record["Status"] = "Partial" + record["Details"] += "文章JSON文件不存在; " + print(f" - 警告: {record['Details']}") + + # 2. 处理海报图片 + poster_dir = os.path.join(entry_path, "poster") + poster_jpg_path = os.path.join(poster_dir, "poster.jpg") + output_poster_path = os.path.join(output_entry_path, "poster.jpg") + record["PosterPath"] = output_poster_path + + if os.path.exists(poster_jpg_path): + try: + shutil.copy2(poster_jpg_path, output_poster_path) + print(f" - 成功复制海报图片: {output_poster_path}") + except Exception as e: + record["Status"] = "Partial" + record["Details"] += f"复制海报图片失败: {e}; " + print(f" - 错误: {record['Details']}") + else: + record["Status"] = "Partial" + record["Details"] += "海报图片不存在; " + print(f" - 警告: {record['Details']}") + + # 3. 处理额外图片 + image_dir = os.path.join(entry_path, "image") + output_image_dir = os.path.join(output_entry_path, "additional_images") + + if os.path.exists(image_dir) and os.path.isdir(image_dir): + try: + os.makedirs(output_image_dir, exist_ok=True) + image_count = 0 + + for filename in os.listdir(image_dir): + if filename.startswith("additional_") and filename.endswith(".jpg"): + source_file = os.path.join(image_dir, filename) + dest_file = os.path.join(output_image_dir, filename) + + # 复制图片 + shutil.copy2(source_file, dest_file) + image_count += 1 + + record["AdditionalImagesCount"] = image_count + print(f" - 复制了 {image_count} 张额外图片到: {output_image_dir}") + except Exception as e: + record["Status"] = "Partial" + record["Details"] += f"处理额外图片时出错: {e}; " + print(f" - 错误: {record['Details']}") + else: + record["AdditionalImagesCount"] = 0 + print(f" - 没有找到额外图片目录") + + # 更新状态 + if record["Status"] == "Processing": + record["Status"] = "Success" + record["Details"] = "处理成功完成" + + # 添加记录到CSV数据 + csv_data.append([record[col] for col in csv_data[0]]) + + # 写入CSV清单 + try: + print(f"\n正在写入清单CSV: {csv_path}") + with open(csv_path, 'w', newline='', encoding='utf-8-sig') as f_csv: + writer = csv.writer(f_csv) + writer.writerows(csv_data) + print(f"清单CSV生成成功") + except Exception as e: + print(f"写入CSV文件时出错: {e}") + traceback.print_exc() + + print(f"\n处理完成. 共处理 {len(entries)} 个条目.") + print(f"结果保存在: {output_dir}") + +def main(): + parser = argparse.ArgumentParser(description="从TravelContentCreator结果目录提取内容并渲染到指定目录") + parser.add_argument("--source", type=str, help="源目录路径") + parser.add_argument("--output", type=str, help="输出目录路径") + parser.add_argument("--run-id", type=str, help="自定义运行ID") + parser.add_argument("--prefer-original", action="store_true", help="优先使用原始内容,忽略审核结果") + + args = parser.parse_args() + + # 默认值设置 + source = args.source if args.source else "/root/autodl-tmp/TravelContentCreator/result/2025-05-12_18-30-15" + output = args.output if args.output else "/root/autodl-tmp/TravelContentCreator/output/2025-05-12_18-30-15" + run_id = args.run_id if args.run_id else os.path.basename(source) + prefer_original = args.prefer_original + + print("-" * 60) + print(f"开始提取和渲染流程") + print(f"源目录: {source}") + print(f"输出目录: {output}") + print(f"运行ID: {run_id}") + if prefer_original: + print("内容模式: 优先使用原始内容") + else: + print("内容模式: 根据审核结果选择内容") + print("-" * 60) + + process_result_directory(source, output, run_id, prefer_original) + + print("\n脚本执行完毕.") + +if __name__ == "__main__": + main() \ No newline at end of file