diff --git a/scripts/distribute_content.py b/scripts/distribute_content.py deleted file mode 100644 index fac3566..0000000 --- a/scripts/distribute_content.py +++ /dev/null @@ -1,723 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import os -import sys -import pandas as pd -import argparse -import random -import logging -import subprocess -from datetime import datetime -import json -import smtplib -import zipfile -import time -import glob -from email.mime.text import MIMEText -from email.mime.multipart import MIMEMultipart -from email.mime.application import MIMEApplication -from email.header import Header - -# 配置日志 -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(levelname)s - %(message)s', - handlers=[ - logging.FileHandler(f"content_distribution_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"), - logging.StreamHandler() - ] -) -logger = logging.getLogger(__name__) - -def parse_arguments(): - """解析命令行参数""" - parser = argparse.ArgumentParser(description='内容分发') - - # 必需参数 - parser.add_argument('--user-csv', type=str, required=True, - help='小红书用户CSV文件路径') - parser.add_argument('--manifest-csv', type=str, required=True, - help='内容清单CSV文件路径') - parser.add_argument('--output-dir', type=str, default='distribution_results', - help='分发结果输出目录') - parser.add_argument('--email-from', type=str, required=True, - help='发件人邮箱地址') - parser.add_argument('--email-password', type=str, required=True, - help='发件人邮箱授权码') - - # 可选参数 - parser.add_argument('--article-per-user', type=int, default=3, - help='每个用户分配的文章数量') - parser.add_argument('--max-send-count', type=int, default=None, - help='最大发送数量限制') - parser.add_argument('--subject', type=str, default='您的旅游内容创作已就绪', - help='邮件主题') - parser.add_argument('--smtp-server', type=str, default='smtp.163.com', - help='SMTP服务器地址') - parser.add_argument('--smtp-port', type=int, default=25, - help='SMTP服务器端口') - parser.add_argument('--use-ssl', action='store_true', - help='使用SSL连接SMTP服务器') - parser.add_argument('--email-column', type=str, default='达人邮箱', - help='用户CSV中邮箱列的名称') - parser.add_argument('--username-column', type=str, default='小红书ID', - help='用户CSV中用户名列的名称') - parser.add_argument('--judge-only-success', action='store_true', - help='只分发审核成功的内容') - parser.add_argument('--test-mode', action='store_true', - help='测试模式,不实际发送邮件') - parser.add_argument('--delay', type=int, default=2, - help='每封邮件发送之间的延迟时间(秒)') - parser.add_argument('--previous-distribution', type=str, default=None, - help='上一次分发结果CSV或报告文件路径,用于避免重复发送') - parser.add_argument('--skip-sent-success', action='store_true', - help='跳过上次成功发送的文章') - parser.add_argument('--zip-filename', type=str, default=None, - help='指定ZIP压缩包的基本文件名(不含扩展名),如"文旅小红书带货笔记内容0512"') - - return parser.parse_args() - -def find_additional_images(file_path): - """查找与文章相关的所有额外图片""" - if not file_path or pd.isna(file_path) or not os.path.exists(file_path): - return [] - - # 获取文章目录 - article_dir = os.path.dirname(file_path) - - # 查找可能的额外图片目录 - additional_images_dir = os.path.join(article_dir, "additional_images") - if not os.path.exists(additional_images_dir): - # 尝试向上一级寻找 - parent_dir = os.path.dirname(article_dir) - additional_images_dir = os.path.join(parent_dir, "additional_images") - if not os.path.exists(additional_images_dir): - return [] - - # 查找所有图片文件 - image_patterns = ["*.jpg", "*.jpeg", "*.png", "*.gif", "*.webp"] - image_files = [] - - for pattern in image_patterns: - image_files.extend(glob.glob(os.path.join(additional_images_dir, pattern))) - - logger.info(f"找到 {len(image_files)} 张额外图片: {additional_images_dir}") - return image_files - -def read_user_csv(user_csv_path, email_column, username_column): - """读取用户CSV文件""" - try: - df = pd.read_csv(user_csv_path) - - # 检查必要的列是否存在 - if email_column not in df.columns: - logger.error(f"用户CSV中缺少邮箱列 '{email_column}'") - return None - - # 过滤有效邮箱(非空且包含@符号) - df = df[df[email_column].notna()] - df = df[df[email_column].astype(str).str.contains('@')] - - # 获取用户名列,如果不存在则创建默认用户名 - if username_column not in df.columns: - logger.warning(f"用户CSV中缺少用户名列 '{username_column}',使用邮箱前缀作为用户名") - df[username_column] = df[email_column].apply(lambda x: x.split('@')[0]) - - logger.info(f"成功读取 {len(df)} 个有效用户") - return df - except Exception as e: - logger.error(f"读取用户CSV失败: {e}") - return None - -def read_manifest_csv(manifest_csv_path, judge_only_success=False): - """读取内容清单CSV文件""" - try: - df = pd.read_csv(manifest_csv_path) - - # 检查必要的列是否存在 - required_columns = ['OutputTxtPath', 'PosterPath', 'ArticleJsonPath'] - missing_columns = [col for col in required_columns if col not in df.columns] - if missing_columns: - logger.error(f"内容清单CSV中缺少必要列: {', '.join(missing_columns)}") - return None - - # 过滤审核成功的内容 - if judge_only_success and 'JudgeStatus' in df.columns: - original_count = len(df) - df = df[df['JudgeStatus'] == True] - logger.info(f"只保留审核成功的内容,从 {original_count} 条过滤为 {len(df)} 条") - - logger.info(f"成功读取 {len(df)} 条内容") - return df - except Exception as e: - logger.error(f"读取内容清单CSV失败: {e}") - return None - -def read_previous_distribution(previous_file, skip_sent_success=True): - """读取上一次的分发结果,获取已发放过的文章ID""" - if not previous_file or not os.path.exists(previous_file): - logger.info("未提供上一次分发结果文件或文件不存在,将不过滤已发放文章") - return set() - - try: - df = pd.read_csv(previous_file) - - # 确定文件类型并提取相关列 - if 'entry_id' in df.columns: # distribution.csv 或 email_results.csv - if 'send_status' in df.columns and skip_sent_success: - # 只过滤成功发送的 - already_sent = df[df['send_status'].str.contains('success', na=False)]['entry_id'].unique() - else: - # 过滤所有分配过的 - already_sent = df['entry_id'].unique() - elif 'EntryID' in df.columns: # manifest_with_dist.csv - if 'sent_success' in df.columns and skip_sent_success: - # 只过滤成功发送的 - already_sent = df[df['sent_success'] > 0]['EntryID'].unique() - else: - # 过滤所有分配过的 - already_sent = df[df['assigned_count'] > 0]['EntryID'].unique() - elif 'entry_id' in df.columns: # distribution_summary.csv - if 'success' in df.columns and skip_sent_success: - # 只过滤成功发送的 - already_sent = df[df['success'] > 0]['entry_id'].unique() - else: - # 过滤所有分配过的 - already_sent = df['entry_id'].unique() - else: - logger.warning(f"无法识别的分发结果文件格式: {previous_file}") - return set() - - already_sent_set = set(already_sent) - logger.info(f"从上一次分发结果中找到 {len(already_sent_set)} 篇已发放文章") - return already_sent_set - - except Exception as e: - logger.error(f"读取上一次分发结果失败: {e}") - return set() - -def allocate_content_to_users(users_df, content_df, article_per_user, - email_column, username_column, max_send_count=None, - already_sent_ids=None): - """为用户分配内容""" - try: - # 创建用户列表 - users = [] - for _, row in users_df.iterrows(): - email = row[email_column] - username = row[username_column] if not pd.isna(row[username_column]) else f"用户_{_}" - users.append({ - 'email': email, - 'username': username, - 'contents': [] - }) - - # 转换为记录列表并过滤已发放的文章 - content_list = content_df.to_dict('records') - if already_sent_ids: - original_count = len(content_list) - content_list = [c for c in content_list if c['EntryID'] not in already_sent_ids] - filtered_count = original_count - len(content_list) - logger.info(f"过滤掉 {filtered_count} 篇已发放文章,剩余 {len(content_list)} 篇可用") - - if not content_list: - logger.warning("过滤后没有可用文章,请提供新内容或关闭过滤功能") - return [] - - # 随机打乱内容列表 - random.shuffle(content_list) - - # 为每个用户分配内容 - content_allocated = [] - content_index = 0 - - # 限制最大发送数量 - if max_send_count is not None and max_send_count > 0: - users = users[:max_send_count] - logger.info(f"限制发送用户数量为 {max_send_count}") - - for user in users: - user_contents = [] - for _ in range(article_per_user): - if content_index >= len(content_list): - content_index = 0 # 如果内容不够,循环使用 - logger.warning("内容不足,将循环使用现有内容") - - content = content_list[content_index] - user_contents.append(content) - content_allocated.append(content) - content_index += 1 - - user['contents'] = user_contents - - logger.info(f"已为 {len(users)} 个用户分配 {len(content_allocated)} 条内容") - unique_content_count = len(set([c['EntryID'] for c in content_allocated])) - logger.info(f"分配的唯一内容条数: {unique_content_count}") - - return users - except Exception as e: - logger.error(f"分配内容失败: {e}") - return [] - -def prepare_distribution_csv(users_with_content, output_dir): - """准备分发CSV文件""" - try: - os.makedirs(output_dir, exist_ok=True) - - # 准备CSV数据 - rows = [] - for user in users_with_content: - for content in user['contents']: - # 查找额外图片 - output_txt_path = content['OutputTxtPath'] - additional_images = find_additional_images(output_txt_path) - - rows.append({ - 'email': user['email'], - 'username': user['username'], - 'file_path': content['OutputTxtPath'], - 'poster_path': content['PosterPath'], - 'article_json_path': content['ArticleJsonPath'], - 'additional_images': ';'.join(additional_images), # 保存为分号分隔的字符串 - 'entry_id': content['EntryID'], - 'topic_index': content.get('TopicIndex', ''), - 'variant_index': content.get('VariantIndex', ''), - 'product': content.get('Product', ''), - 'object': content.get('Object', ''), - 'date': content.get('Date', ''), - 'logic': content.get('Logic', '') - }) - - # 创建DataFrame并保存 - df = pd.DataFrame(rows) - distribution_csv = os.path.join(output_dir, f"distribution_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv") - df.to_csv(distribution_csv, index=False) - - logger.info(f"分发CSV已保存到: {distribution_csv}") - return distribution_csv - except Exception as e: - logger.error(f"准备分发CSV失败: {e}") - return None - -def create_zip_file(files, output_path): - """将文件打包为ZIP""" - try: - os.makedirs(os.path.dirname(output_path), exist_ok=True) - with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zipf: - for file_path in files: - if not file_path or pd.isna(file_path): - continue - - if os.path.exists(file_path): - arcname = os.path.basename(file_path) - zipf.write(file_path, arcname=arcname) - else: - logger.warning(f"文件不存在,跳过: {file_path}") - - logger.info(f"成功创建ZIP文件: {output_path}") - return output_path - except Exception as e: - logger.error(f"创建ZIP文件失败: {e}") - return None - -def send_single_email(from_addr, password, to_addr, subject, content, attachments, - smtp_server, smtp_port, use_ssl=False): - """发送单封邮件""" - try: - # 创建邮件对象 - message = MIMEMultipart() - message['From'] = Header(from_addr) - message['To'] = Header(to_addr) - message['Subject'] = Header(subject) - - # 添加正文 - message.attach(MIMEText(content, 'plain', 'utf-8')) - - # 添加附件 - for attachment in attachments: - if not os.path.exists(attachment): - logger.warning(f"附件不存在,跳过: {attachment}") - continue - - with open(attachment, 'rb') as f: - part = MIMEApplication(f.read()) - part.add_header('Content-Disposition', 'attachment', - filename=Header(os.path.basename(attachment), 'utf-8').encode()) - message.attach(part) - - # 连接SMTP服务器并发送 - if use_ssl: - server = smtplib.SMTP_SSL(smtp_server, smtp_port) - else: - server = smtplib.SMTP(smtp_server, smtp_port) - - server.login(from_addr, password) - server.sendmail(from_addr, to_addr, message.as_string()) - server.quit() - - logger.info(f"成功发送邮件到: {to_addr}") - return True - except Exception as e: - logger.error(f"发送邮件到 {to_addr} 失败: {e}") - return False - -def send_emails(distribution_csv, output_dir, email_from, email_password, - subject, smtp_server, smtp_port, use_ssl, test_mode, delay=2, zip_filename=None): - """发送邮件""" - try: - # 确保输出目录存在 - os.makedirs(output_dir, exist_ok=True) - os.makedirs(os.path.join(output_dir, "temp_zips"), exist_ok=True) - - # 读取分发CSV - df = pd.read_csv(distribution_csv) - logger.info(f"从 {distribution_csv} 读取了 {len(df)} 条分发记录") - - # 按邮箱分组 - email_groups = {} - for _, row in df.iterrows(): - email = row['email'] - if email not in email_groups: - email_groups[email] = [] - email_groups[email].append(row) - - logger.info(f"共有 {len(email_groups)} 个邮箱需要发送") - - # 结果记录 - results = { - "total": len(email_groups), - "success": 0, - "failed": 0, - "details": [] - } - - # 临时ZIP文件列表 - temp_zips = [] - - # 发送邮件 - for email, rows in email_groups.items(): - try: - logger.info(f"处理邮箱: {email}") - - # 收集所有文件路径 - files = [] - for row in rows: - # 收集文本文件 - file_path = row.get('file_path') - if file_path and not pd.isna(file_path) and os.path.exists(file_path): - files.append(file_path) - - # 收集海报图片 - poster_path = row.get('poster_path') - if poster_path and not pd.isna(poster_path) and os.path.exists(poster_path): - files.append(poster_path) - - # 收集所有额外图片 - additional_images = row.get('additional_images', '') - if additional_images and not pd.isna(additional_images): - for img_path in additional_images.split(';'): - if img_path.strip() and os.path.exists(img_path): - files.append(img_path) - - if not files: - logger.warning(f"邮箱 {email} 没有有效的附件文件,跳过") - results["details"].append({ - "email": email, - "status": "skipped", - "reason": "没有有效的附件文件", - "files": [] - }) - results["failed"] += 1 - continue - - # 创建ZIP文件 - if zip_filename: - # 使用指定的文件名 - zip_filename_with_ext = f"{zip_filename}.zip" - else: - # 使用默认的时间戳文件名 - zip_filename_with_ext = f"content_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{hash(email) % 10000}.zip" - - zip_path = os.path.join(output_dir, "temp_zips", zip_filename_with_ext) - - zip_file = create_zip_file(files, zip_path) - if zip_file: - temp_zips.append(zip_file) - else: - logger.error(f"为邮箱 {email} 创建ZIP文件失败,跳过") - results["details"].append({ - "email": email, - "status": "failed", - "reason": "创建ZIP文件失败", - "files": files - }) - results["failed"] += 1 - continue - - # 构建邮件内容 - files_count = len(files) - entries_count = len(rows) - - email_content = f"""您好!请查收今日带货笔记(文案+配图),内容在文件压缩包内。具体挂载商品等操作流程请查看对应达人微信群内信息。 - -""" - - # 发送邮件 - if test_mode: - logger.info(f"测试模式: 模拟发送邮件到 {email}") - results["details"].append({ - "email": email, - "status": "success (test mode)", - "files": files - }) - results["success"] += 1 - else: - success = send_single_email( - email_from, - email_password, - email, - subject, - email_content, - [zip_file], - smtp_server, - smtp_port, - use_ssl - ) - - if success: - results["success"] += 1 - results["details"].append({ - "email": email, - "status": "success", - "files": files - }) - else: - results["failed"] += 1 - results["details"].append({ - "email": email, - "status": "failed", - "reason": "SMTP发送失败", - "files": files - }) - - # 添加延迟,避免发送过快 - if not test_mode and delay > 0: - logger.info(f"等待 {delay} 秒后继续发送下一封邮件...") - time.sleep(delay) - - except Exception as e: - logger.error(f"处理邮箱 {email} 时出错: {e}") - results["failed"] += 1 - results["details"].append({ - "email": email, - "status": "failed", - "reason": str(e), - "files": [] - }) - - # 打印发送结果摘要 - logger.info("\n===== 邮件发送结果摘要 =====") - logger.info(f"总计: {results['total']} 个邮箱") - logger.info(f"成功: {results['success']} 个邮箱") - logger.info(f"失败: {results['failed']} 个邮箱") - - # 清理临时文件 - if not test_mode: - logger.info("清理临时ZIP文件...") - for temp_zip in temp_zips: - try: - if os.path.exists(temp_zip): - os.remove(temp_zip) - logger.info(f"已删除临时文件: {temp_zip}") - except Exception as e: - logger.warning(f"删除临时文件失败: {temp_zip}, 错误: {e}") - - # 将结果写入CSV文件 - try: - result_df = pd.DataFrame(results["details"]) - result_csv = os.path.join(output_dir, - f"email_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv") - result_df.to_csv(result_csv, index=False) - logger.info(f"发送结果已保存到: {result_csv}") - return result_csv - except Exception as e: - logger.error(f"保存结果到CSV失败: {e}") - return None - - except Exception as e: - logger.error(f"发送邮件过程中发生错误: {e}") - return None - -def generate_distribution_report(distribution_csv, send_result_csv, manifest_csv, output_dir): - """生成分发报告""" - try: - # 读取分发CSV - dist_df = pd.read_csv(distribution_csv) - - # 读取发送结果CSV - if send_result_csv and os.path.exists(send_result_csv): - result_df = pd.read_csv(send_result_csv) - # 添加发送状态 - email_status = {} - for _, row in result_df.iterrows(): - email_status[row['email']] = row['status'] - - dist_df['send_status'] = dist_df['email'].map(email_status) - else: - dist_df['send_status'] = 'unknown' - - # 统计文章分发情况 - article_stats = {} - for _, row in dist_df.iterrows(): - entry_id = row['entry_id'] - if entry_id not in article_stats: - article_stats[entry_id] = { - 'total_assigned': 0, - 'sent_success': 0, - 'sent_failed': 0, - 'sent_unknown': 0, - 'product': row.get('product', ''), - 'object': row.get('object', ''), - 'date': row.get('date', ''), - 'logic': row.get('logic', '') - } - - article_stats[entry_id]['total_assigned'] += 1 - - if 'success' in str(row['send_status']).lower(): - article_stats[entry_id]['sent_success'] += 1 - elif row['send_status'] in ['failed', 'skipped']: - article_stats[entry_id]['sent_failed'] += 1 - else: - article_stats[entry_id]['sent_unknown'] += 1 - - # 读取原始清单 - manifest_df = pd.read_csv(manifest_csv) - - # 创建带分发状态的清单 - manifest_with_dist = manifest_df.copy() - manifest_with_dist['assigned_count'] = manifest_with_dist['EntryID'].map( - {k: v['total_assigned'] for k, v in article_stats.items()}) - manifest_with_dist['sent_success'] = manifest_with_dist['EntryID'].map( - {k: v['sent_success'] for k, v in article_stats.items()}) - - # 填充NaN值 - manifest_with_dist['assigned_count'] = manifest_with_dist['assigned_count'].fillna(0).astype(int) - manifest_with_dist['sent_success'] = manifest_with_dist['sent_success'].fillna(0).astype(int) - - # 添加是否被分发的标记 - manifest_with_dist['is_distributed'] = manifest_with_dist['assigned_count'] > 0 - - # 保存报告 - report_csv = os.path.join(output_dir, f"distribution_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv") - manifest_with_dist.to_csv(report_csv, index=False) - - # 创建简洁的分发摘要报告 - summary_data = [] - for entry_id, stats in article_stats.items(): - summary_data.append({ - 'entry_id': entry_id, - 'product': stats['product'], - 'object': stats['object'], - 'date': stats['date'], - 'logic': stats['logic'], - 'assigned': stats['total_assigned'], - 'success': stats['sent_success'], - 'failed': stats['sent_failed'], - 'unknown': stats['sent_unknown'] - }) - - if summary_data: - summary_df = pd.DataFrame(summary_data) - summary_csv = os.path.join(output_dir, f"distribution_summary_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv") - summary_df.to_csv(summary_csv, index=False) - logger.info(f"分发摘要报告已保存到: {summary_csv}") - - # 保存文章统计 - stats_json = os.path.join(output_dir, f"article_stats_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json") - with open(stats_json, 'w') as f: - json.dump(article_stats, f, indent=2) - - # 统计摘要 - total_articles = len(manifest_df) - distributed_articles = sum(1 for _, v in article_stats.items() if v['total_assigned'] > 0) - success_sent = sum(1 for _, v in article_stats.items() if v['sent_success'] > 0) - - logger.info(f"分发报告已保存到: {report_csv}") - logger.info(f"文章统计已保存到: {stats_json}") - logger.info("\n===== 分发统计 =====") - logger.info(f"总文章数: {total_articles}") - logger.info(f"已分配文章数: {distributed_articles} ({distributed_articles/total_articles*100:.2f}%)") - logger.info(f"成功发送文章数: {success_sent} ({success_sent/total_articles*100:.2f}%)") - - # 按产品统计 - if 'Product' in manifest_df.columns: - product_stats = manifest_with_dist.groupby('Product')['is_distributed'].value_counts().unstack().fillna(0) - if not product_stats.empty: - logger.info("\n===== 按产品分发统计 =====") - for product, row in product_stats.iterrows(): - if True in row: - distributed = row.get(True, 0) - total = row.sum() - logger.info(f"产品 '{product}': {distributed}/{total} ({distributed/total*100:.2f}%)") - - return report_csv - except Exception as e: - logger.error(f"生成分发报告失败: {e}") - return None - -def main(): - args = parse_arguments() - - # 创建输出目录 - os.makedirs(args.output_dir, exist_ok=True) - - # 读取用户CSV - users_df = read_user_csv(args.user_csv, args.email_column, args.username_column) - if users_df is None: - logger.error("无法处理用户CSV,程序退出") - return - - # 读取内容清单CSV - content_df = read_manifest_csv(args.manifest_csv, args.judge_only_success) - if content_df is None: - logger.error("无法处理内容清单CSV,程序退出") - return - - # 读取上一次分发结果(如果提供) - already_sent_ids = None - if args.previous_distribution: - already_sent_ids = read_previous_distribution(args.previous_distribution, args.skip_sent_success) - - # 为用户分配内容 - users_with_content = allocate_content_to_users( - users_df, content_df, args.article_per_user, - args.email_column, args.username_column, args.max_send_count, - already_sent_ids - ) - - if not users_with_content: - logger.error("内容分配失败,程序退出") - return - - # 准备分发CSV - distribution_csv = prepare_distribution_csv(users_with_content, args.output_dir) - if not distribution_csv: - logger.error("准备分发CSV失败,程序退出") - return - - # 发送邮件 - send_result_csv = send_emails( - distribution_csv, args.output_dir, args.email_from, args.email_password, - args.subject, args.smtp_server, args.smtp_port, args.use_ssl, args.test_mode, - args.delay, args.zip_filename - ) - - # 生成分发报告 - generate_distribution_report(distribution_csv, send_result_csv, args.manifest_csv, args.output_dir) - - logger.info("内容分发流程完成") - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/scripts/distribute_example.sh b/scripts/distribute_example.sh deleted file mode 100755 index d9bf0f0..0000000 --- a/scripts/distribute_example.sh +++ /dev/null @@ -1,80 +0,0 @@ -#!/bin/bash - -# 设置时间戳变量 -TIMESTAMP=$(date +"%Y%m%d_%H%M%S") - -# 设置路径变量 -BASE_DIR="/root/autodl-tmp/TravelContentCreator" -LOG_DIR="$BASE_DIR/log" -RESULT_DIR="$BASE_DIR/output/$TIMESTAMP" -OUTPUT_DIR="$RESULT_DIR/distribution_results" - -# 设置其他变量 -USER_CSV="$BASE_DIR/output/5.12 copy.csv" -MANIFEST_CSV="$BASE_DIR/output/2025-05-12_09-33-12/manifest_2025-05-12_09-33-12.csv" -EMAIL_FROM="zwysendemail@163.com" -EMAIL_PASSWORD="NMhVGFmCJkGEy3B5" -SUBJECT="文旅小红书带货笔记内容0512" -# 上一次分发结果文件(如果存在) -PREVIOUS_DIST="$BASE_DIR/distribution_results/distribution_summary_20250512_183328.csv" -# 压缩包文件名 -ZIP_FILENAME="文旅小红书带货笔记内容0512" - -# 创建必要的目录 -mkdir -p "$LOG_DIR" -mkdir -p "$OUTPUT_DIR" - -# 将日志同时输出到控制台和日志文件 -LOG_FILE="$LOG_DIR/distribution_$TIMESTAMP.log" -exec > >(tee -a "$LOG_FILE") 2>&1 - -echo "开始执行分发脚本 - $(date)" -echo "日志保存在: $LOG_FILE" -echo "结果保存在: $RESULT_DIR" - -# 测试模式运行 -echo "在测试模式下运行,不会实际发送邮件..." -python scripts/distribute_content.py \ - --user-csv "$USER_CSV" \ - --manifest-csv "$MANIFEST_CSV" \ - --output-dir "$OUTPUT_DIR" \ - --email-from "$EMAIL_FROM" \ - --email-password "$EMAIL_PASSWORD" \ - --subject "$SUBJECT" \ - --article-per-user 1 \ - --judge-only-success \ - --previous-distribution "$PREVIOUS_DIST" \ - --skip-sent-success \ - --zip-filename "$ZIP_FILENAME" - -# 实际发送邮件的命令(取消注释以启用) -# echo "开始实际发送邮件..." -# python scripts/distribute_content.py \ -# --user-csv "$USER_CSV" \ -# --manifest-csv "$MANIFEST_CSV" \ -# --output-dir "$OUTPUT_DIR" \ -# --email-from "$EMAIL_FROM" \ -# --email-password "$EMAIL_PASSWORD" \ -# --subject "$SUBJECT" \ -# --article-per-user 3 \ -# --use-ssl \ -# --smtp-port 465 \ -# --judge-only-success \ -# --max-send-count 10 \ # 限制最多发送给10个用户 -# --previous-distribution "$PREVIOUS_DIST" \ -# --skip-sent-success \ -# --zip-filename "$ZIP_FILENAME" - -# 不使用过滤功能的示例 -# python scripts/distribute_content.py \ -# --user-csv "$USER_CSV" \ -# --manifest-csv "$MANIFEST_CSV" \ -# --output-dir "$OUTPUT_DIR" \ -# --email-from "$EMAIL_FROM" \ -# --email-password "$EMAIL_PASSWORD" \ -# --subject "$SUBJECT" \ -# --article-per-user 3 \ -# --judge-only-success \ -# --zip-filename "$ZIP_FILENAME" - -echo "脚本执行完成 - $(date)" \ No newline at end of file diff --git a/scripts/extract_and_render.py b/scripts/extract_and_render.py deleted file mode 100644 index f8d06bc..0000000 --- a/scripts/extract_and_render.py +++ /dev/null @@ -1,383 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import os -import json -import shutil -import csv -import traceback -import re -import argparse -from datetime import datetime - -def convert_json_to_txt_content(json_path, prefer_original=False): - """ - 读取 JSON 文件,提取标题、内容和标签,移除 Markdown 格式, - 并返回格式化文本。 - - 根据judge_success字段决定使用原始内容还是审核后内容: - - judge_success为True时使用title/content(除非prefer_original=True) - - judge_success为False时使用original_title/original_content - - Args: - json_path: JSON文件路径 - prefer_original: 是否优先使用原始内容,无视judge_success结果 - """ - print(f" - 正在读取 JSON: {json_path}") - if not os.path.exists(json_path): - print(f" - 警告: JSON 文件不存在: {json_path}") - return None, f"文件未找到: {json_path}" - - try: - with open(json_path, 'r', encoding='utf-8') as f_json: - data = json.load(f_json) - - # 根据judge_success选择标题和内容 - judge_success = data.get('judge_success', None) - - if prefer_original and 'original_title' in data and 'original_content' in data: - # 优先使用原始内容 - title = data.get('original_title', '未找到原始标题') - content = data.get('original_content', '未找到原始内容') - # 优先使用原始标签 - tags = data.get('original_tags', data.get('tags', '未找到标签')) - print(f" - 优先使用原始内容 (prefer_original=True)") - elif judge_success is True and not prefer_original: - # 使用审核后的内容 - title = data.get('title', '未找到标题') - content = data.get('content', '未找到内容') - tags = data.get('tags', '未找到标签') - print(f" - 使用审核后内容 (judge_success=True)") - elif 'original_title' in data and 'original_content' in data: - # 使用原始内容 - title = data.get('original_title', '未找到原始标题') - content = data.get('original_content', '未找到原始内容') - # 优先使用原始标签 - tags = data.get('original_tags', data.get('tags', '未找到标签')) - print(f" - 使用原始内容 (judge_success={judge_success})") - else: - # 若无original字段,使用常规字段 - title = data.get('title', '未找到标题') - content = data.get('content', '未找到内容') - tags = data.get('tags', '未找到标签') - print(f" - 使用常规内容 (无judge结果)") - - # 解决tag/tags字段重复问题,按照修正后的处理逻辑,只使用tags字段 - if not tags and 'tag' in data: - tags = data.get('tag', '未找到标签') - print(f" - 使用tag字段作为标签 (该字段将在后续版本中统一为tags)") - - # 移除Markdown格式 - content_no_format = re.sub(r'\*\*(.*?)\*\*', r'\1', content) - - # 组合输出文本 - return f"{title}\n\n{content_no_format}\n\n{tags}", None - except json.JSONDecodeError: - print(f" - 错误: JSON 格式无效: {json_path}") - return None, f"无效的 JSON 格式: {json_path}" - except Exception as e: - print(f" - 错误: 处理 JSON 时出错: {e}") - return None, f"处理 JSON 时出错: {e}" - -def load_topic_data(source_dir, run_id): - """ - 加载选题数据 - - Args: - source_dir: 源目录路径 - run_id: 运行ID - - Returns: - dict: 以topic_index为键的选题数据字典 - """ - topic_file_path = os.path.join(source_dir, f"tweet_topic_{run_id}.json") - topic_data = {} - - if os.path.exists(topic_file_path): - try: - with open(topic_file_path, 'r', encoding='utf-8') as f: - topics = json.load(f) - - # 将选题数据转换为以index为键的字典 - for topic in topics: - index = topic.get("index") - if index: - topic_data[index] = topic - - print(f"成功加载选题数据,共{len(topic_data)}条") - except Exception as e: - print(f"加载选题数据时出错: {e}") - else: - print(f"警告: 未找到选题文件: {topic_file_path}") - - return topic_data - -def process_result_directory(source_dir, output_dir, run_id=None, prefer_original=False): - """ - 处理指定的结果目录,提取内容并渲染到输出目录。 - - Args: - source_dir: 源目录路径,包含i_j子目录 - output_dir: 输出目录路径 - run_id: 可选的运行ID,如果不提供则使用源目录名 - prefer_original: 是否优先使用原始内容,无视judge_success结果 - """ - if not os.path.isdir(source_dir): - print(f"错误: 源目录不存在: {source_dir}") - return - - # 创建输出目录 - os.makedirs(output_dir, exist_ok=True) - print(f"确保输出目录存在: {output_dir}") - - # 提取run_id - if not run_id: - run_id = os.path.basename(source_dir) - - # 加载选题数据 - topic_data = load_topic_data(source_dir, run_id) - - # 创建CSV清单,添加选题相关字段 - csv_path = os.path.join(output_dir, f"manifest_{run_id}.csv") - csv_data = [ - [ - "EntryID", - "TopicIndex", - "VariantIndex", - "Date", - "Logic", - "Object", - "Product", - "ProductLogic", - "Style", - "StyleLogic", - "TargetAudience", - "TargetAudienceLogic", - "SourcePath", - "ArticleJsonPath", - "OutputTxtPath", - "PosterPath", - "AdditionalImagesCount", - "Status", - "Details", - "JudgeStatus", - "ContentSource" - ] - ] - - # 查找所有i_j目录 - entry_pattern = re.compile(r"^(\d+)_(\d+)$") - entries = [] - - for item in os.listdir(source_dir): - item_path = os.path.join(source_dir, item) - match = entry_pattern.match(item) - if os.path.isdir(item_path) and match: - entries.append(item) - - if not entries: - print(f"警告: 在源目录中未找到任何i_j格式的子目录") - return - - print(f"找到 {len(entries)} 个条目目录") - - # 处理每个条目 - for entry in sorted(entries): - entry_path = os.path.join(source_dir, entry) - output_entry_path = os.path.join(output_dir, entry) - - print(f"\n处理条目: {entry}") - - # 解析topic_index和variant_index - match = entry_pattern.match(entry) - topic_index = match.group(1) - variant_index = match.group(2) - - # 获取该话题的选题信息 - topic_info = topic_data.get(topic_index, {}) - - # 创建记录 - record = { - "EntryID": entry, - "TopicIndex": topic_index, - "VariantIndex": variant_index, - "Date": topic_info.get("date", ""), - "Logic": topic_info.get("logic", ""), - "Object": topic_info.get("object", ""), - "Product": topic_info.get("product", ""), - "ProductLogic": topic_info.get("product_logic", ""), - "Style": topic_info.get("style", ""), - "StyleLogic": topic_info.get("style_logic", ""), - "TargetAudience": topic_info.get("target_audience", ""), - "TargetAudienceLogic": topic_info.get("target_audience_logic", ""), - "SourcePath": entry_path, - "ArticleJsonPath": "", - "OutputTxtPath": "", - "PosterPath": "", - "AdditionalImagesCount": 0, - "Status": "Processing", - "Details": "", - "JudgeStatus": "", - "ContentSource": "unknown" - } - - # 创建输出条目目录 - try: - os.makedirs(output_entry_path, exist_ok=True) - except Exception as e: - record["Status"] = "Failed" - record["Details"] = f"创建输出目录失败: {e}" - csv_data.append([record[col] for col in csv_data[0]]) - print(f" - 错误: {record['Details']}") - continue - - # 1. 处理article.json -> txt - json_path = os.path.join(entry_path, "article.json") - txt_path = os.path.join(output_entry_path, "article.txt") - record["ArticleJsonPath"] = json_path - record["OutputTxtPath"] = txt_path - - if os.path.exists(json_path): - # 读取article.json - try: - with open(json_path, 'r', encoding='utf-8') as f_json: - article_data = json.load(f_json) - # 提取judge_success状态 - if "judge_success" in article_data: - record["JudgeStatus"] = str(article_data["judge_success"]) - elif "judged" in article_data: - record["JudgeStatus"] = "已审核" if article_data["judged"] else "未审核" - except Exception as e: - print(f" - 错误: 读取article.json失败: {e}") - - txt_content, error = convert_json_to_txt_content(json_path, prefer_original) - if error: - record["Status"] = "Partial" - record["Details"] += f"文章处理失败: {error}; " - print(f" - 错误: {record['Details']}") - else: - try: - with open(txt_path, 'w', encoding='utf-8') as f_txt: - f_txt.write(txt_content) - print(f" - 成功写入文本文件: {txt_path}") - - # 记录内容来源 - if prefer_original: - record["ContentSource"] = "original_preferred" - elif article_data.get("judge_success") is True: - record["ContentSource"] = "judged" - elif "original_title" in article_data: - record["ContentSource"] = "original" - else: - record["ContentSource"] = "default" - - except Exception as e: - record["Status"] = "Partial" - record["Details"] += f"写入文本文件失败: {e}; " - print(f" - 错误: {record['Details']}") - else: - record["Status"] = "Partial" - record["Details"] += "文章JSON文件不存在; " - print(f" - 警告: {record['Details']}") - - # 2. 处理海报图片 - poster_dir = os.path.join(entry_path, "poster") - poster_jpg_path = os.path.join(poster_dir, "poster.jpg") - output_poster_path = os.path.join(output_entry_path, "poster.jpg") - record["PosterPath"] = output_poster_path - - if os.path.exists(poster_jpg_path): - try: - shutil.copy2(poster_jpg_path, output_poster_path) - print(f" - 成功复制海报图片: {output_poster_path}") - except Exception as e: - record["Status"] = "Partial" - record["Details"] += f"复制海报图片失败: {e}; " - print(f" - 错误: {record['Details']}") - else: - record["Status"] = "Partial" - record["Details"] += "海报图片不存在; " - print(f" - 警告: {record['Details']}") - - # 3. 处理额外图片 - image_dir = os.path.join(entry_path, "image") - output_image_dir = os.path.join(output_entry_path, "additional_images") - - if os.path.exists(image_dir) and os.path.isdir(image_dir): - try: - os.makedirs(output_image_dir, exist_ok=True) - image_count = 0 - - for filename in os.listdir(image_dir): - if filename.startswith("additional_") and filename.endswith(".jpg"): - source_file = os.path.join(image_dir, filename) - dest_file = os.path.join(output_image_dir, filename) - - # 复制图片 - shutil.copy2(source_file, dest_file) - image_count += 1 - - record["AdditionalImagesCount"] = image_count - print(f" - 复制了 {image_count} 张额外图片到: {output_image_dir}") - except Exception as e: - record["Status"] = "Partial" - record["Details"] += f"处理额外图片时出错: {e}; " - print(f" - 错误: {record['Details']}") - else: - record["AdditionalImagesCount"] = 0 - print(f" - 没有找到额外图片目录") - - # 更新状态 - if record["Status"] == "Processing": - record["Status"] = "Success" - record["Details"] = "处理成功完成" - - # 添加记录到CSV数据 - csv_data.append([record[col] for col in csv_data[0]]) - - # 写入CSV清单 - try: - print(f"\n正在写入清单CSV: {csv_path}") - with open(csv_path, 'w', newline='', encoding='utf-8-sig') as f_csv: - writer = csv.writer(f_csv) - writer.writerows(csv_data) - print(f"清单CSV生成成功") - except Exception as e: - print(f"写入CSV文件时出错: {e}") - traceback.print_exc() - - print(f"\n处理完成. 共处理 {len(entries)} 个条目.") - print(f"结果保存在: {output_dir}") - -def main(): - parser = argparse.ArgumentParser(description="从TravelContentCreator结果目录提取内容并渲染到指定目录") - parser.add_argument("--source", type=str, help="源目录路径") - parser.add_argument("--output", type=str, help="输出目录路径") - parser.add_argument("--run-id", type=str, help="自定义运行ID") - parser.add_argument("--prefer-original", action="store_true", help="优先使用原始内容,忽略审核结果") - - args = parser.parse_args() - - # 默认值设置 - source = args.source if args.source else "/root/autodl-tmp/TravelContentCreator/result/2025-05-12_09-33-12" - output = args.output if args.output else "/root/autodl-tmp/TravelContentCreator/output/2025-05-12_09-33-12" - run_id = args.run_id if args.run_id else os.path.basename(source) - prefer_original = args.prefer_original - - print("-" * 60) - print(f"开始提取和渲染流程") - print(f"源目录: {source}") - print(f"输出目录: {output}") - print(f"运行ID: {run_id}") - if prefer_original: - print("内容模式: 优先使用原始内容") - else: - print("内容模式: 根据审核结果选择内容") - print("-" * 60) - - process_result_directory(source, output, run_id, prefer_original) - - print("\n脚本执行完毕.") - -if __name__ == "__main__": - main() \ No newline at end of file