整合了一下现有脚本

This commit is contained in:
jinye_huang 2025-05-13 09:35:23 +08:00
parent b2e8ec3bec
commit d2a5fbde7d
3 changed files with 0 additions and 1186 deletions

View File

@ -1,723 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import pandas as pd
import argparse
import random
import logging
import subprocess
from datetime import datetime
import json
import smtplib
import zipfile
import time
import glob
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
from email.header import Header
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(f"content_distribution_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def parse_arguments():
"""解析命令行参数"""
parser = argparse.ArgumentParser(description='内容分发')
# 必需参数
parser.add_argument('--user-csv', type=str, required=True,
help='小红书用户CSV文件路径')
parser.add_argument('--manifest-csv', type=str, required=True,
help='内容清单CSV文件路径')
parser.add_argument('--output-dir', type=str, default='distribution_results',
help='分发结果输出目录')
parser.add_argument('--email-from', type=str, required=True,
help='发件人邮箱地址')
parser.add_argument('--email-password', type=str, required=True,
help='发件人邮箱授权码')
# 可选参数
parser.add_argument('--article-per-user', type=int, default=3,
help='每个用户分配的文章数量')
parser.add_argument('--max-send-count', type=int, default=None,
help='最大发送数量限制')
parser.add_argument('--subject', type=str, default='您的旅游内容创作已就绪',
help='邮件主题')
parser.add_argument('--smtp-server', type=str, default='smtp.163.com',
help='SMTP服务器地址')
parser.add_argument('--smtp-port', type=int, default=25,
help='SMTP服务器端口')
parser.add_argument('--use-ssl', action='store_true',
help='使用SSL连接SMTP服务器')
parser.add_argument('--email-column', type=str, default='达人邮箱',
help='用户CSV中邮箱列的名称')
parser.add_argument('--username-column', type=str, default='小红书ID',
help='用户CSV中用户名列的名称')
parser.add_argument('--judge-only-success', action='store_true',
help='只分发审核成功的内容')
parser.add_argument('--test-mode', action='store_true',
help='测试模式,不实际发送邮件')
parser.add_argument('--delay', type=int, default=2,
help='每封邮件发送之间的延迟时间(秒)')
parser.add_argument('--previous-distribution', type=str, default=None,
help='上一次分发结果CSV或报告文件路径用于避免重复发送')
parser.add_argument('--skip-sent-success', action='store_true',
help='跳过上次成功发送的文章')
parser.add_argument('--zip-filename', type=str, default=None,
help='指定ZIP压缩包的基本文件名不含扩展名"文旅小红书带货笔记内容0512"')
return parser.parse_args()
def find_additional_images(file_path):
"""查找与文章相关的所有额外图片"""
if not file_path or pd.isna(file_path) or not os.path.exists(file_path):
return []
# 获取文章目录
article_dir = os.path.dirname(file_path)
# 查找可能的额外图片目录
additional_images_dir = os.path.join(article_dir, "additional_images")
if not os.path.exists(additional_images_dir):
# 尝试向上一级寻找
parent_dir = os.path.dirname(article_dir)
additional_images_dir = os.path.join(parent_dir, "additional_images")
if not os.path.exists(additional_images_dir):
return []
# 查找所有图片文件
image_patterns = ["*.jpg", "*.jpeg", "*.png", "*.gif", "*.webp"]
image_files = []
for pattern in image_patterns:
image_files.extend(glob.glob(os.path.join(additional_images_dir, pattern)))
logger.info(f"找到 {len(image_files)} 张额外图片: {additional_images_dir}")
return image_files
def read_user_csv(user_csv_path, email_column, username_column):
"""读取用户CSV文件"""
try:
df = pd.read_csv(user_csv_path)
# 检查必要的列是否存在
if email_column not in df.columns:
logger.error(f"用户CSV中缺少邮箱列 '{email_column}'")
return None
# 过滤有效邮箱(非空且包含@符号)
df = df[df[email_column].notna()]
df = df[df[email_column].astype(str).str.contains('@')]
# 获取用户名列,如果不存在则创建默认用户名
if username_column not in df.columns:
logger.warning(f"用户CSV中缺少用户名列 '{username_column}',使用邮箱前缀作为用户名")
df[username_column] = df[email_column].apply(lambda x: x.split('@')[0])
logger.info(f"成功读取 {len(df)} 个有效用户")
return df
except Exception as e:
logger.error(f"读取用户CSV失败: {e}")
return None
def read_manifest_csv(manifest_csv_path, judge_only_success=False):
"""读取内容清单CSV文件"""
try:
df = pd.read_csv(manifest_csv_path)
# 检查必要的列是否存在
required_columns = ['OutputTxtPath', 'PosterPath', 'ArticleJsonPath']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
logger.error(f"内容清单CSV中缺少必要列: {', '.join(missing_columns)}")
return None
# 过滤审核成功的内容
if judge_only_success and 'JudgeStatus' in df.columns:
original_count = len(df)
df = df[df['JudgeStatus'] == True]
logger.info(f"只保留审核成功的内容,从 {original_count} 条过滤为 {len(df)}")
logger.info(f"成功读取 {len(df)} 条内容")
return df
except Exception as e:
logger.error(f"读取内容清单CSV失败: {e}")
return None
def read_previous_distribution(previous_file, skip_sent_success=True):
"""读取上一次的分发结果获取已发放过的文章ID"""
if not previous_file or not os.path.exists(previous_file):
logger.info("未提供上一次分发结果文件或文件不存在,将不过滤已发放文章")
return set()
try:
df = pd.read_csv(previous_file)
# 确定文件类型并提取相关列
if 'entry_id' in df.columns: # distribution.csv 或 email_results.csv
if 'send_status' in df.columns and skip_sent_success:
# 只过滤成功发送的
already_sent = df[df['send_status'].str.contains('success', na=False)]['entry_id'].unique()
else:
# 过滤所有分配过的
already_sent = df['entry_id'].unique()
elif 'EntryID' in df.columns: # manifest_with_dist.csv
if 'sent_success' in df.columns and skip_sent_success:
# 只过滤成功发送的
already_sent = df[df['sent_success'] > 0]['EntryID'].unique()
else:
# 过滤所有分配过的
already_sent = df[df['assigned_count'] > 0]['EntryID'].unique()
elif 'entry_id' in df.columns: # distribution_summary.csv
if 'success' in df.columns and skip_sent_success:
# 只过滤成功发送的
already_sent = df[df['success'] > 0]['entry_id'].unique()
else:
# 过滤所有分配过的
already_sent = df['entry_id'].unique()
else:
logger.warning(f"无法识别的分发结果文件格式: {previous_file}")
return set()
already_sent_set = set(already_sent)
logger.info(f"从上一次分发结果中找到 {len(already_sent_set)} 篇已发放文章")
return already_sent_set
except Exception as e:
logger.error(f"读取上一次分发结果失败: {e}")
return set()
def allocate_content_to_users(users_df, content_df, article_per_user,
email_column, username_column, max_send_count=None,
already_sent_ids=None):
"""为用户分配内容"""
try:
# 创建用户列表
users = []
for _, row in users_df.iterrows():
email = row[email_column]
username = row[username_column] if not pd.isna(row[username_column]) else f"用户_{_}"
users.append({
'email': email,
'username': username,
'contents': []
})
# 转换为记录列表并过滤已发放的文章
content_list = content_df.to_dict('records')
if already_sent_ids:
original_count = len(content_list)
content_list = [c for c in content_list if c['EntryID'] not in already_sent_ids]
filtered_count = original_count - len(content_list)
logger.info(f"过滤掉 {filtered_count} 篇已发放文章,剩余 {len(content_list)} 篇可用")
if not content_list:
logger.warning("过滤后没有可用文章,请提供新内容或关闭过滤功能")
return []
# 随机打乱内容列表
random.shuffle(content_list)
# 为每个用户分配内容
content_allocated = []
content_index = 0
# 限制最大发送数量
if max_send_count is not None and max_send_count > 0:
users = users[:max_send_count]
logger.info(f"限制发送用户数量为 {max_send_count}")
for user in users:
user_contents = []
for _ in range(article_per_user):
if content_index >= len(content_list):
content_index = 0 # 如果内容不够,循环使用
logger.warning("内容不足,将循环使用现有内容")
content = content_list[content_index]
user_contents.append(content)
content_allocated.append(content)
content_index += 1
user['contents'] = user_contents
logger.info(f"已为 {len(users)} 个用户分配 {len(content_allocated)} 条内容")
unique_content_count = len(set([c['EntryID'] for c in content_allocated]))
logger.info(f"分配的唯一内容条数: {unique_content_count}")
return users
except Exception as e:
logger.error(f"分配内容失败: {e}")
return []
def prepare_distribution_csv(users_with_content, output_dir):
"""准备分发CSV文件"""
try:
os.makedirs(output_dir, exist_ok=True)
# 准备CSV数据
rows = []
for user in users_with_content:
for content in user['contents']:
# 查找额外图片
output_txt_path = content['OutputTxtPath']
additional_images = find_additional_images(output_txt_path)
rows.append({
'email': user['email'],
'username': user['username'],
'file_path': content['OutputTxtPath'],
'poster_path': content['PosterPath'],
'article_json_path': content['ArticleJsonPath'],
'additional_images': ';'.join(additional_images), # 保存为分号分隔的字符串
'entry_id': content['EntryID'],
'topic_index': content.get('TopicIndex', ''),
'variant_index': content.get('VariantIndex', ''),
'product': content.get('Product', ''),
'object': content.get('Object', ''),
'date': content.get('Date', ''),
'logic': content.get('Logic', '')
})
# 创建DataFrame并保存
df = pd.DataFrame(rows)
distribution_csv = os.path.join(output_dir, f"distribution_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv")
df.to_csv(distribution_csv, index=False)
logger.info(f"分发CSV已保存到: {distribution_csv}")
return distribution_csv
except Exception as e:
logger.error(f"准备分发CSV失败: {e}")
return None
def create_zip_file(files, output_path):
"""将文件打包为ZIP"""
try:
os.makedirs(os.path.dirname(output_path), exist_ok=True)
with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for file_path in files:
if not file_path or pd.isna(file_path):
continue
if os.path.exists(file_path):
arcname = os.path.basename(file_path)
zipf.write(file_path, arcname=arcname)
else:
logger.warning(f"文件不存在,跳过: {file_path}")
logger.info(f"成功创建ZIP文件: {output_path}")
return output_path
except Exception as e:
logger.error(f"创建ZIP文件失败: {e}")
return None
def send_single_email(from_addr, password, to_addr, subject, content, attachments,
smtp_server, smtp_port, use_ssl=False):
"""发送单封邮件"""
try:
# 创建邮件对象
message = MIMEMultipart()
message['From'] = Header(from_addr)
message['To'] = Header(to_addr)
message['Subject'] = Header(subject)
# 添加正文
message.attach(MIMEText(content, 'plain', 'utf-8'))
# 添加附件
for attachment in attachments:
if not os.path.exists(attachment):
logger.warning(f"附件不存在,跳过: {attachment}")
continue
with open(attachment, 'rb') as f:
part = MIMEApplication(f.read())
part.add_header('Content-Disposition', 'attachment',
filename=Header(os.path.basename(attachment), 'utf-8').encode())
message.attach(part)
# 连接SMTP服务器并发送
if use_ssl:
server = smtplib.SMTP_SSL(smtp_server, smtp_port)
else:
server = smtplib.SMTP(smtp_server, smtp_port)
server.login(from_addr, password)
server.sendmail(from_addr, to_addr, message.as_string())
server.quit()
logger.info(f"成功发送邮件到: {to_addr}")
return True
except Exception as e:
logger.error(f"发送邮件到 {to_addr} 失败: {e}")
return False
def send_emails(distribution_csv, output_dir, email_from, email_password,
subject, smtp_server, smtp_port, use_ssl, test_mode, delay=2, zip_filename=None):
"""发送邮件"""
try:
# 确保输出目录存在
os.makedirs(output_dir, exist_ok=True)
os.makedirs(os.path.join(output_dir, "temp_zips"), exist_ok=True)
# 读取分发CSV
df = pd.read_csv(distribution_csv)
logger.info(f"{distribution_csv} 读取了 {len(df)} 条分发记录")
# 按邮箱分组
email_groups = {}
for _, row in df.iterrows():
email = row['email']
if email not in email_groups:
email_groups[email] = []
email_groups[email].append(row)
logger.info(f"共有 {len(email_groups)} 个邮箱需要发送")
# 结果记录
results = {
"total": len(email_groups),
"success": 0,
"failed": 0,
"details": []
}
# 临时ZIP文件列表
temp_zips = []
# 发送邮件
for email, rows in email_groups.items():
try:
logger.info(f"处理邮箱: {email}")
# 收集所有文件路径
files = []
for row in rows:
# 收集文本文件
file_path = row.get('file_path')
if file_path and not pd.isna(file_path) and os.path.exists(file_path):
files.append(file_path)
# 收集海报图片
poster_path = row.get('poster_path')
if poster_path and not pd.isna(poster_path) and os.path.exists(poster_path):
files.append(poster_path)
# 收集所有额外图片
additional_images = row.get('additional_images', '')
if additional_images and not pd.isna(additional_images):
for img_path in additional_images.split(';'):
if img_path.strip() and os.path.exists(img_path):
files.append(img_path)
if not files:
logger.warning(f"邮箱 {email} 没有有效的附件文件,跳过")
results["details"].append({
"email": email,
"status": "skipped",
"reason": "没有有效的附件文件",
"files": []
})
results["failed"] += 1
continue
# 创建ZIP文件
if zip_filename:
# 使用指定的文件名
zip_filename_with_ext = f"{zip_filename}.zip"
else:
# 使用默认的时间戳文件名
zip_filename_with_ext = f"content_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{hash(email) % 10000}.zip"
zip_path = os.path.join(output_dir, "temp_zips", zip_filename_with_ext)
zip_file = create_zip_file(files, zip_path)
if zip_file:
temp_zips.append(zip_file)
else:
logger.error(f"为邮箱 {email} 创建ZIP文件失败跳过")
results["details"].append({
"email": email,
"status": "failed",
"reason": "创建ZIP文件失败",
"files": files
})
results["failed"] += 1
continue
# 构建邮件内容
files_count = len(files)
entries_count = len(rows)
email_content = f"""您好!请查收今日带货笔记(文案+配图),内容在文件压缩包内。具体挂载商品等操作流程请查看对应达人微信群内信息。
"""
# 发送邮件
if test_mode:
logger.info(f"测试模式: 模拟发送邮件到 {email}")
results["details"].append({
"email": email,
"status": "success (test mode)",
"files": files
})
results["success"] += 1
else:
success = send_single_email(
email_from,
email_password,
email,
subject,
email_content,
[zip_file],
smtp_server,
smtp_port,
use_ssl
)
if success:
results["success"] += 1
results["details"].append({
"email": email,
"status": "success",
"files": files
})
else:
results["failed"] += 1
results["details"].append({
"email": email,
"status": "failed",
"reason": "SMTP发送失败",
"files": files
})
# 添加延迟,避免发送过快
if not test_mode and delay > 0:
logger.info(f"等待 {delay} 秒后继续发送下一封邮件...")
time.sleep(delay)
except Exception as e:
logger.error(f"处理邮箱 {email} 时出错: {e}")
results["failed"] += 1
results["details"].append({
"email": email,
"status": "failed",
"reason": str(e),
"files": []
})
# 打印发送结果摘要
logger.info("\n===== 邮件发送结果摘要 =====")
logger.info(f"总计: {results['total']} 个邮箱")
logger.info(f"成功: {results['success']} 个邮箱")
logger.info(f"失败: {results['failed']} 个邮箱")
# 清理临时文件
if not test_mode:
logger.info("清理临时ZIP文件...")
for temp_zip in temp_zips:
try:
if os.path.exists(temp_zip):
os.remove(temp_zip)
logger.info(f"已删除临时文件: {temp_zip}")
except Exception as e:
logger.warning(f"删除临时文件失败: {temp_zip}, 错误: {e}")
# 将结果写入CSV文件
try:
result_df = pd.DataFrame(results["details"])
result_csv = os.path.join(output_dir,
f"email_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv")
result_df.to_csv(result_csv, index=False)
logger.info(f"发送结果已保存到: {result_csv}")
return result_csv
except Exception as e:
logger.error(f"保存结果到CSV失败: {e}")
return None
except Exception as e:
logger.error(f"发送邮件过程中发生错误: {e}")
return None
def generate_distribution_report(distribution_csv, send_result_csv, manifest_csv, output_dir):
"""生成分发报告"""
try:
# 读取分发CSV
dist_df = pd.read_csv(distribution_csv)
# 读取发送结果CSV
if send_result_csv and os.path.exists(send_result_csv):
result_df = pd.read_csv(send_result_csv)
# 添加发送状态
email_status = {}
for _, row in result_df.iterrows():
email_status[row['email']] = row['status']
dist_df['send_status'] = dist_df['email'].map(email_status)
else:
dist_df['send_status'] = 'unknown'
# 统计文章分发情况
article_stats = {}
for _, row in dist_df.iterrows():
entry_id = row['entry_id']
if entry_id not in article_stats:
article_stats[entry_id] = {
'total_assigned': 0,
'sent_success': 0,
'sent_failed': 0,
'sent_unknown': 0,
'product': row.get('product', ''),
'object': row.get('object', ''),
'date': row.get('date', ''),
'logic': row.get('logic', '')
}
article_stats[entry_id]['total_assigned'] += 1
if 'success' in str(row['send_status']).lower():
article_stats[entry_id]['sent_success'] += 1
elif row['send_status'] in ['failed', 'skipped']:
article_stats[entry_id]['sent_failed'] += 1
else:
article_stats[entry_id]['sent_unknown'] += 1
# 读取原始清单
manifest_df = pd.read_csv(manifest_csv)
# 创建带分发状态的清单
manifest_with_dist = manifest_df.copy()
manifest_with_dist['assigned_count'] = manifest_with_dist['EntryID'].map(
{k: v['total_assigned'] for k, v in article_stats.items()})
manifest_with_dist['sent_success'] = manifest_with_dist['EntryID'].map(
{k: v['sent_success'] for k, v in article_stats.items()})
# 填充NaN值
manifest_with_dist['assigned_count'] = manifest_with_dist['assigned_count'].fillna(0).astype(int)
manifest_with_dist['sent_success'] = manifest_with_dist['sent_success'].fillna(0).astype(int)
# 添加是否被分发的标记
manifest_with_dist['is_distributed'] = manifest_with_dist['assigned_count'] > 0
# 保存报告
report_csv = os.path.join(output_dir, f"distribution_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv")
manifest_with_dist.to_csv(report_csv, index=False)
# 创建简洁的分发摘要报告
summary_data = []
for entry_id, stats in article_stats.items():
summary_data.append({
'entry_id': entry_id,
'product': stats['product'],
'object': stats['object'],
'date': stats['date'],
'logic': stats['logic'],
'assigned': stats['total_assigned'],
'success': stats['sent_success'],
'failed': stats['sent_failed'],
'unknown': stats['sent_unknown']
})
if summary_data:
summary_df = pd.DataFrame(summary_data)
summary_csv = os.path.join(output_dir, f"distribution_summary_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv")
summary_df.to_csv(summary_csv, index=False)
logger.info(f"分发摘要报告已保存到: {summary_csv}")
# 保存文章统计
stats_json = os.path.join(output_dir, f"article_stats_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json")
with open(stats_json, 'w') as f:
json.dump(article_stats, f, indent=2)
# 统计摘要
total_articles = len(manifest_df)
distributed_articles = sum(1 for _, v in article_stats.items() if v['total_assigned'] > 0)
success_sent = sum(1 for _, v in article_stats.items() if v['sent_success'] > 0)
logger.info(f"分发报告已保存到: {report_csv}")
logger.info(f"文章统计已保存到: {stats_json}")
logger.info("\n===== 分发统计 =====")
logger.info(f"总文章数: {total_articles}")
logger.info(f"已分配文章数: {distributed_articles} ({distributed_articles/total_articles*100:.2f}%)")
logger.info(f"成功发送文章数: {success_sent} ({success_sent/total_articles*100:.2f}%)")
# 按产品统计
if 'Product' in manifest_df.columns:
product_stats = manifest_with_dist.groupby('Product')['is_distributed'].value_counts().unstack().fillna(0)
if not product_stats.empty:
logger.info("\n===== 按产品分发统计 =====")
for product, row in product_stats.iterrows():
if True in row:
distributed = row.get(True, 0)
total = row.sum()
logger.info(f"产品 '{product}': {distributed}/{total} ({distributed/total*100:.2f}%)")
return report_csv
except Exception as e:
logger.error(f"生成分发报告失败: {e}")
return None
def main():
args = parse_arguments()
# 创建输出目录
os.makedirs(args.output_dir, exist_ok=True)
# 读取用户CSV
users_df = read_user_csv(args.user_csv, args.email_column, args.username_column)
if users_df is None:
logger.error("无法处理用户CSV程序退出")
return
# 读取内容清单CSV
content_df = read_manifest_csv(args.manifest_csv, args.judge_only_success)
if content_df is None:
logger.error("无法处理内容清单CSV程序退出")
return
# 读取上一次分发结果(如果提供)
already_sent_ids = None
if args.previous_distribution:
already_sent_ids = read_previous_distribution(args.previous_distribution, args.skip_sent_success)
# 为用户分配内容
users_with_content = allocate_content_to_users(
users_df, content_df, args.article_per_user,
args.email_column, args.username_column, args.max_send_count,
already_sent_ids
)
if not users_with_content:
logger.error("内容分配失败,程序退出")
return
# 准备分发CSV
distribution_csv = prepare_distribution_csv(users_with_content, args.output_dir)
if not distribution_csv:
logger.error("准备分发CSV失败程序退出")
return
# 发送邮件
send_result_csv = send_emails(
distribution_csv, args.output_dir, args.email_from, args.email_password,
args.subject, args.smtp_server, args.smtp_port, args.use_ssl, args.test_mode,
args.delay, args.zip_filename
)
# 生成分发报告
generate_distribution_report(distribution_csv, send_result_csv, args.manifest_csv, args.output_dir)
logger.info("内容分发流程完成")
if __name__ == "__main__":
main()

View File

@ -1,80 +0,0 @@
#!/bin/bash
# 设置时间戳变量
TIMESTAMP=$(date +"%Y%m%d_%H%M%S")
# 设置路径变量
BASE_DIR="/root/autodl-tmp/TravelContentCreator"
LOG_DIR="$BASE_DIR/log"
RESULT_DIR="$BASE_DIR/output/$TIMESTAMP"
OUTPUT_DIR="$RESULT_DIR/distribution_results"
# 设置其他变量
USER_CSV="$BASE_DIR/output/5.12 copy.csv"
MANIFEST_CSV="$BASE_DIR/output/2025-05-12_09-33-12/manifest_2025-05-12_09-33-12.csv"
EMAIL_FROM="zwysendemail@163.com"
EMAIL_PASSWORD="NMhVGFmCJkGEy3B5"
SUBJECT="文旅小红书带货笔记内容0512"
# 上一次分发结果文件(如果存在)
PREVIOUS_DIST="$BASE_DIR/distribution_results/distribution_summary_20250512_183328.csv"
# 压缩包文件名
ZIP_FILENAME="文旅小红书带货笔记内容0512"
# 创建必要的目录
mkdir -p "$LOG_DIR"
mkdir -p "$OUTPUT_DIR"
# 将日志同时输出到控制台和日志文件
LOG_FILE="$LOG_DIR/distribution_$TIMESTAMP.log"
exec > >(tee -a "$LOG_FILE") 2>&1
echo "开始执行分发脚本 - $(date)"
echo "日志保存在: $LOG_FILE"
echo "结果保存在: $RESULT_DIR"
# 测试模式运行
echo "在测试模式下运行,不会实际发送邮件..."
python scripts/distribute_content.py \
--user-csv "$USER_CSV" \
--manifest-csv "$MANIFEST_CSV" \
--output-dir "$OUTPUT_DIR" \
--email-from "$EMAIL_FROM" \
--email-password "$EMAIL_PASSWORD" \
--subject "$SUBJECT" \
--article-per-user 1 \
--judge-only-success \
--previous-distribution "$PREVIOUS_DIST" \
--skip-sent-success \
--zip-filename "$ZIP_FILENAME"
# 实际发送邮件的命令(取消注释以启用)
# echo "开始实际发送邮件..."
# python scripts/distribute_content.py \
# --user-csv "$USER_CSV" \
# --manifest-csv "$MANIFEST_CSV" \
# --output-dir "$OUTPUT_DIR" \
# --email-from "$EMAIL_FROM" \
# --email-password "$EMAIL_PASSWORD" \
# --subject "$SUBJECT" \
# --article-per-user 3 \
# --use-ssl \
# --smtp-port 465 \
# --judge-only-success \
# --max-send-count 10 \ # 限制最多发送给10个用户
# --previous-distribution "$PREVIOUS_DIST" \
# --skip-sent-success \
# --zip-filename "$ZIP_FILENAME"
# 不使用过滤功能的示例
# python scripts/distribute_content.py \
# --user-csv "$USER_CSV" \
# --manifest-csv "$MANIFEST_CSV" \
# --output-dir "$OUTPUT_DIR" \
# --email-from "$EMAIL_FROM" \
# --email-password "$EMAIL_PASSWORD" \
# --subject "$SUBJECT" \
# --article-per-user 3 \
# --judge-only-success \
# --zip-filename "$ZIP_FILENAME"
echo "脚本执行完成 - $(date)"

View File

@ -1,383 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import json
import shutil
import csv
import traceback
import re
import argparse
from datetime import datetime
def convert_json_to_txt_content(json_path, prefer_original=False):
"""
读取 JSON 文件提取标题内容和标签移除 Markdown 格式
并返回格式化文本
根据judge_success字段决定使用原始内容还是审核后内容
- judge_success为True时使用title/content除非prefer_original=True
- judge_success为False时使用original_title/original_content
Args:
json_path: JSON文件路径
prefer_original: 是否优先使用原始内容无视judge_success结果
"""
print(f" - 正在读取 JSON: {json_path}")
if not os.path.exists(json_path):
print(f" - 警告: JSON 文件不存在: {json_path}")
return None, f"文件未找到: {json_path}"
try:
with open(json_path, 'r', encoding='utf-8') as f_json:
data = json.load(f_json)
# 根据judge_success选择标题和内容
judge_success = data.get('judge_success', None)
if prefer_original and 'original_title' in data and 'original_content' in data:
# 优先使用原始内容
title = data.get('original_title', '未找到原始标题')
content = data.get('original_content', '未找到原始内容')
# 优先使用原始标签
tags = data.get('original_tags', data.get('tags', '未找到标签'))
print(f" - 优先使用原始内容 (prefer_original=True)")
elif judge_success is True and not prefer_original:
# 使用审核后的内容
title = data.get('title', '未找到标题')
content = data.get('content', '未找到内容')
tags = data.get('tags', '未找到标签')
print(f" - 使用审核后内容 (judge_success=True)")
elif 'original_title' in data and 'original_content' in data:
# 使用原始内容
title = data.get('original_title', '未找到原始标题')
content = data.get('original_content', '未找到原始内容')
# 优先使用原始标签
tags = data.get('original_tags', data.get('tags', '未找到标签'))
print(f" - 使用原始内容 (judge_success={judge_success})")
else:
# 若无original字段使用常规字段
title = data.get('title', '未找到标题')
content = data.get('content', '未找到内容')
tags = data.get('tags', '未找到标签')
print(f" - 使用常规内容 (无judge结果)")
# 解决tag/tags字段重复问题按照修正后的处理逻辑只使用tags字段
if not tags and 'tag' in data:
tags = data.get('tag', '未找到标签')
print(f" - 使用tag字段作为标签 (该字段将在后续版本中统一为tags)")
# 移除Markdown格式
content_no_format = re.sub(r'\*\*(.*?)\*\*', r'\1', content)
# 组合输出文本
return f"{title}\n\n{content_no_format}\n\n{tags}", None
except json.JSONDecodeError:
print(f" - 错误: JSON 格式无效: {json_path}")
return None, f"无效的 JSON 格式: {json_path}"
except Exception as e:
print(f" - 错误: 处理 JSON 时出错: {e}")
return None, f"处理 JSON 时出错: {e}"
def load_topic_data(source_dir, run_id):
"""
加载选题数据
Args:
source_dir: 源目录路径
run_id: 运行ID
Returns:
dict: 以topic_index为键的选题数据字典
"""
topic_file_path = os.path.join(source_dir, f"tweet_topic_{run_id}.json")
topic_data = {}
if os.path.exists(topic_file_path):
try:
with open(topic_file_path, 'r', encoding='utf-8') as f:
topics = json.load(f)
# 将选题数据转换为以index为键的字典
for topic in topics:
index = topic.get("index")
if index:
topic_data[index] = topic
print(f"成功加载选题数据,共{len(topic_data)}")
except Exception as e:
print(f"加载选题数据时出错: {e}")
else:
print(f"警告: 未找到选题文件: {topic_file_path}")
return topic_data
def process_result_directory(source_dir, output_dir, run_id=None, prefer_original=False):
"""
处理指定的结果目录提取内容并渲染到输出目录
Args:
source_dir: 源目录路径包含i_j子目录
output_dir: 输出目录路径
run_id: 可选的运行ID如果不提供则使用源目录名
prefer_original: 是否优先使用原始内容无视judge_success结果
"""
if not os.path.isdir(source_dir):
print(f"错误: 源目录不存在: {source_dir}")
return
# 创建输出目录
os.makedirs(output_dir, exist_ok=True)
print(f"确保输出目录存在: {output_dir}")
# 提取run_id
if not run_id:
run_id = os.path.basename(source_dir)
# 加载选题数据
topic_data = load_topic_data(source_dir, run_id)
# 创建CSV清单添加选题相关字段
csv_path = os.path.join(output_dir, f"manifest_{run_id}.csv")
csv_data = [
[
"EntryID",
"TopicIndex",
"VariantIndex",
"Date",
"Logic",
"Object",
"Product",
"ProductLogic",
"Style",
"StyleLogic",
"TargetAudience",
"TargetAudienceLogic",
"SourcePath",
"ArticleJsonPath",
"OutputTxtPath",
"PosterPath",
"AdditionalImagesCount",
"Status",
"Details",
"JudgeStatus",
"ContentSource"
]
]
# 查找所有i_j目录
entry_pattern = re.compile(r"^(\d+)_(\d+)$")
entries = []
for item in os.listdir(source_dir):
item_path = os.path.join(source_dir, item)
match = entry_pattern.match(item)
if os.path.isdir(item_path) and match:
entries.append(item)
if not entries:
print(f"警告: 在源目录中未找到任何i_j格式的子目录")
return
print(f"找到 {len(entries)} 个条目目录")
# 处理每个条目
for entry in sorted(entries):
entry_path = os.path.join(source_dir, entry)
output_entry_path = os.path.join(output_dir, entry)
print(f"\n处理条目: {entry}")
# 解析topic_index和variant_index
match = entry_pattern.match(entry)
topic_index = match.group(1)
variant_index = match.group(2)
# 获取该话题的选题信息
topic_info = topic_data.get(topic_index, {})
# 创建记录
record = {
"EntryID": entry,
"TopicIndex": topic_index,
"VariantIndex": variant_index,
"Date": topic_info.get("date", ""),
"Logic": topic_info.get("logic", ""),
"Object": topic_info.get("object", ""),
"Product": topic_info.get("product", ""),
"ProductLogic": topic_info.get("product_logic", ""),
"Style": topic_info.get("style", ""),
"StyleLogic": topic_info.get("style_logic", ""),
"TargetAudience": topic_info.get("target_audience", ""),
"TargetAudienceLogic": topic_info.get("target_audience_logic", ""),
"SourcePath": entry_path,
"ArticleJsonPath": "",
"OutputTxtPath": "",
"PosterPath": "",
"AdditionalImagesCount": 0,
"Status": "Processing",
"Details": "",
"JudgeStatus": "",
"ContentSource": "unknown"
}
# 创建输出条目目录
try:
os.makedirs(output_entry_path, exist_ok=True)
except Exception as e:
record["Status"] = "Failed"
record["Details"] = f"创建输出目录失败: {e}"
csv_data.append([record[col] for col in csv_data[0]])
print(f" - 错误: {record['Details']}")
continue
# 1. 处理article.json -> txt
json_path = os.path.join(entry_path, "article.json")
txt_path = os.path.join(output_entry_path, "article.txt")
record["ArticleJsonPath"] = json_path
record["OutputTxtPath"] = txt_path
if os.path.exists(json_path):
# 读取article.json
try:
with open(json_path, 'r', encoding='utf-8') as f_json:
article_data = json.load(f_json)
# 提取judge_success状态
if "judge_success" in article_data:
record["JudgeStatus"] = str(article_data["judge_success"])
elif "judged" in article_data:
record["JudgeStatus"] = "已审核" if article_data["judged"] else "未审核"
except Exception as e:
print(f" - 错误: 读取article.json失败: {e}")
txt_content, error = convert_json_to_txt_content(json_path, prefer_original)
if error:
record["Status"] = "Partial"
record["Details"] += f"文章处理失败: {error}; "
print(f" - 错误: {record['Details']}")
else:
try:
with open(txt_path, 'w', encoding='utf-8') as f_txt:
f_txt.write(txt_content)
print(f" - 成功写入文本文件: {txt_path}")
# 记录内容来源
if prefer_original:
record["ContentSource"] = "original_preferred"
elif article_data.get("judge_success") is True:
record["ContentSource"] = "judged"
elif "original_title" in article_data:
record["ContentSource"] = "original"
else:
record["ContentSource"] = "default"
except Exception as e:
record["Status"] = "Partial"
record["Details"] += f"写入文本文件失败: {e}; "
print(f" - 错误: {record['Details']}")
else:
record["Status"] = "Partial"
record["Details"] += "文章JSON文件不存在; "
print(f" - 警告: {record['Details']}")
# 2. 处理海报图片
poster_dir = os.path.join(entry_path, "poster")
poster_jpg_path = os.path.join(poster_dir, "poster.jpg")
output_poster_path = os.path.join(output_entry_path, "poster.jpg")
record["PosterPath"] = output_poster_path
if os.path.exists(poster_jpg_path):
try:
shutil.copy2(poster_jpg_path, output_poster_path)
print(f" - 成功复制海报图片: {output_poster_path}")
except Exception as e:
record["Status"] = "Partial"
record["Details"] += f"复制海报图片失败: {e}; "
print(f" - 错误: {record['Details']}")
else:
record["Status"] = "Partial"
record["Details"] += "海报图片不存在; "
print(f" - 警告: {record['Details']}")
# 3. 处理额外图片
image_dir = os.path.join(entry_path, "image")
output_image_dir = os.path.join(output_entry_path, "additional_images")
if os.path.exists(image_dir) and os.path.isdir(image_dir):
try:
os.makedirs(output_image_dir, exist_ok=True)
image_count = 0
for filename in os.listdir(image_dir):
if filename.startswith("additional_") and filename.endswith(".jpg"):
source_file = os.path.join(image_dir, filename)
dest_file = os.path.join(output_image_dir, filename)
# 复制图片
shutil.copy2(source_file, dest_file)
image_count += 1
record["AdditionalImagesCount"] = image_count
print(f" - 复制了 {image_count} 张额外图片到: {output_image_dir}")
except Exception as e:
record["Status"] = "Partial"
record["Details"] += f"处理额外图片时出错: {e}; "
print(f" - 错误: {record['Details']}")
else:
record["AdditionalImagesCount"] = 0
print(f" - 没有找到额外图片目录")
# 更新状态
if record["Status"] == "Processing":
record["Status"] = "Success"
record["Details"] = "处理成功完成"
# 添加记录到CSV数据
csv_data.append([record[col] for col in csv_data[0]])
# 写入CSV清单
try:
print(f"\n正在写入清单CSV: {csv_path}")
with open(csv_path, 'w', newline='', encoding='utf-8-sig') as f_csv:
writer = csv.writer(f_csv)
writer.writerows(csv_data)
print(f"清单CSV生成成功")
except Exception as e:
print(f"写入CSV文件时出错: {e}")
traceback.print_exc()
print(f"\n处理完成. 共处理 {len(entries)} 个条目.")
print(f"结果保存在: {output_dir}")
def main():
parser = argparse.ArgumentParser(description="从TravelContentCreator结果目录提取内容并渲染到指定目录")
parser.add_argument("--source", type=str, help="源目录路径")
parser.add_argument("--output", type=str, help="输出目录路径")
parser.add_argument("--run-id", type=str, help="自定义运行ID")
parser.add_argument("--prefer-original", action="store_true", help="优先使用原始内容,忽略审核结果")
args = parser.parse_args()
# 默认值设置
source = args.source if args.source else "/root/autodl-tmp/TravelContentCreator/result/2025-05-12_09-33-12"
output = args.output if args.output else "/root/autodl-tmp/TravelContentCreator/output/2025-05-12_09-33-12"
run_id = args.run_id if args.run_id else os.path.basename(source)
prefer_original = args.prefer_original
print("-" * 60)
print(f"开始提取和渲染流程")
print(f"源目录: {source}")
print(f"输出目录: {output}")
print(f"运行ID: {run_id}")
if prefer_original:
print("内容模式: 优先使用原始内容")
else:
print("内容模式: 根据审核结果选择内容")
print("-" * 60)
process_result_directory(source, output, run_id, prefer_original)
print("\n脚本执行完毕.")
if __name__ == "__main__":
main()