邮箱分发脚本初始化

This commit is contained in:
jinye_huang 2025-05-12 18:39:35 +08:00
parent c8e049fc68
commit b73f00ba95
2 changed files with 750 additions and 0 deletions

View File

@ -0,0 +1,674 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import pandas as pd
import argparse
import random
import logging
import subprocess
from datetime import datetime
import json
import smtplib
import zipfile
import time
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
from email.header import Header
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(f"content_distribution_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def parse_arguments():
"""解析命令行参数"""
parser = argparse.ArgumentParser(description='内容分发')
# 必需参数
parser.add_argument('--user-csv', type=str, required=True,
help='小红书用户CSV文件路径')
parser.add_argument('--manifest-csv', type=str, required=True,
help='内容清单CSV文件路径')
parser.add_argument('--output-dir', type=str, default='distribution_results',
help='分发结果输出目录')
parser.add_argument('--email-from', type=str, required=True,
help='发件人邮箱地址')
parser.add_argument('--email-password', type=str, required=True,
help='发件人邮箱授权码')
# 可选参数
parser.add_argument('--article-per-user', type=int, default=3,
help='每个用户分配的文章数量')
parser.add_argument('--max-send-count', type=int, default=None,
help='最大发送数量限制')
parser.add_argument('--subject', type=str, default='您的旅游内容创作已就绪',
help='邮件主题')
parser.add_argument('--smtp-server', type=str, default='smtp.163.com',
help='SMTP服务器地址')
parser.add_argument('--smtp-port', type=int, default=25,
help='SMTP服务器端口')
parser.add_argument('--use-ssl', action='store_true',
help='使用SSL连接SMTP服务器')
parser.add_argument('--email-column', type=str, default='达人邮箱',
help='用户CSV中邮箱列的名称')
parser.add_argument('--username-column', type=str, default='小红书ID',
help='用户CSV中用户名列的名称')
parser.add_argument('--judge-only-success', action='store_true',
help='只分发审核成功的内容')
parser.add_argument('--test-mode', action='store_true',
help='测试模式,不实际发送邮件')
parser.add_argument('--delay', type=int, default=2,
help='每封邮件发送之间的延迟时间(秒)')
parser.add_argument('--previous-distribution', type=str, default=None,
help='上一次分发结果CSV或报告文件路径用于避免重复发送')
parser.add_argument('--skip-sent-success', action='store_true',
help='跳过上次成功发送的文章')
return parser.parse_args()
def read_user_csv(user_csv_path, email_column, username_column):
"""读取用户CSV文件"""
try:
df = pd.read_csv(user_csv_path)
# 检查必要的列是否存在
if email_column not in df.columns:
logger.error(f"用户CSV中缺少邮箱列 '{email_column}'")
return None
# 过滤有效邮箱(非空且包含@符号)
df = df[df[email_column].notna()]
df = df[df[email_column].astype(str).str.contains('@')]
# 获取用户名列,如果不存在则创建默认用户名
if username_column not in df.columns:
logger.warning(f"用户CSV中缺少用户名列 '{username_column}',使用邮箱前缀作为用户名")
df[username_column] = df[email_column].apply(lambda x: x.split('@')[0])
logger.info(f"成功读取 {len(df)} 个有效用户")
return df
except Exception as e:
logger.error(f"读取用户CSV失败: {e}")
return None
def read_manifest_csv(manifest_csv_path, judge_only_success=False):
"""读取内容清单CSV文件"""
try:
df = pd.read_csv(manifest_csv_path)
# 检查必要的列是否存在
required_columns = ['OutputTxtPath', 'PosterPath', 'ArticleJsonPath']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
logger.error(f"内容清单CSV中缺少必要列: {', '.join(missing_columns)}")
return None
# 过滤审核成功的内容
if judge_only_success and 'JudgeStatus' in df.columns:
original_count = len(df)
df = df[df['JudgeStatus'] == True]
logger.info(f"只保留审核成功的内容,从 {original_count} 条过滤为 {len(df)}")
logger.info(f"成功读取 {len(df)} 条内容")
return df
except Exception as e:
logger.error(f"读取内容清单CSV失败: {e}")
return None
def read_previous_distribution(previous_file, skip_sent_success=True):
"""读取上一次的分发结果获取已发放过的文章ID"""
if not previous_file or not os.path.exists(previous_file):
logger.info("未提供上一次分发结果文件或文件不存在,将不过滤已发放文章")
return set()
try:
df = pd.read_csv(previous_file)
# 确定文件类型并提取相关列
if 'entry_id' in df.columns: # distribution.csv 或 email_results.csv
if 'send_status' in df.columns and skip_sent_success:
# 只过滤成功发送的
already_sent = df[df['send_status'].str.contains('success', na=False)]['entry_id'].unique()
else:
# 过滤所有分配过的
already_sent = df['entry_id'].unique()
elif 'EntryID' in df.columns: # manifest_with_dist.csv
if 'sent_success' in df.columns and skip_sent_success:
# 只过滤成功发送的
already_sent = df[df['sent_success'] > 0]['EntryID'].unique()
else:
# 过滤所有分配过的
already_sent = df[df['assigned_count'] > 0]['EntryID'].unique()
elif 'entry_id' in df.columns: # distribution_summary.csv
if 'success' in df.columns and skip_sent_success:
# 只过滤成功发送的
already_sent = df[df['success'] > 0]['entry_id'].unique()
else:
# 过滤所有分配过的
already_sent = df['entry_id'].unique()
else:
logger.warning(f"无法识别的分发结果文件格式: {previous_file}")
return set()
already_sent_set = set(already_sent)
logger.info(f"从上一次分发结果中找到 {len(already_sent_set)} 篇已发放文章")
return already_sent_set
except Exception as e:
logger.error(f"读取上一次分发结果失败: {e}")
return set()
def allocate_content_to_users(users_df, content_df, article_per_user,
email_column, username_column, max_send_count=None,
already_sent_ids=None):
"""为用户分配内容"""
try:
# 创建用户列表
users = []
for _, row in users_df.iterrows():
email = row[email_column]
username = row[username_column] if not pd.isna(row[username_column]) else f"用户_{_}"
users.append({
'email': email,
'username': username,
'contents': []
})
# 转换为记录列表并过滤已发放的文章
content_list = content_df.to_dict('records')
if already_sent_ids:
original_count = len(content_list)
content_list = [c for c in content_list if c['EntryID'] not in already_sent_ids]
filtered_count = original_count - len(content_list)
logger.info(f"过滤掉 {filtered_count} 篇已发放文章,剩余 {len(content_list)} 篇可用")
if not content_list:
logger.warning("过滤后没有可用文章,请提供新内容或关闭过滤功能")
return []
# 随机打乱内容列表
random.shuffle(content_list)
# 为每个用户分配内容
content_allocated = []
content_index = 0
# 限制最大发送数量
if max_send_count is not None and max_send_count > 0:
users = users[:max_send_count]
logger.info(f"限制发送用户数量为 {max_send_count}")
for user in users:
user_contents = []
for _ in range(article_per_user):
if content_index >= len(content_list):
content_index = 0 # 如果内容不够,循环使用
logger.warning("内容不足,将循环使用现有内容")
content = content_list[content_index]
user_contents.append(content)
content_allocated.append(content)
content_index += 1
user['contents'] = user_contents
logger.info(f"已为 {len(users)} 个用户分配 {len(content_allocated)} 条内容")
unique_content_count = len(set([c['EntryID'] for c in content_allocated]))
logger.info(f"分配的唯一内容条数: {unique_content_count}")
return users
except Exception as e:
logger.error(f"分配内容失败: {e}")
return []
def prepare_distribution_csv(users_with_content, output_dir):
"""准备分发CSV文件"""
try:
os.makedirs(output_dir, exist_ok=True)
# 准备CSV数据
rows = []
for user in users_with_content:
for content in user['contents']:
rows.append({
'email': user['email'],
'username': user['username'],
'file_path': content['OutputTxtPath'],
'poster_path': content['PosterPath'],
'article_json_path': content['ArticleJsonPath'],
'entry_id': content['EntryID'],
'topic_index': content.get('TopicIndex', ''),
'variant_index': content.get('VariantIndex', ''),
'product': content.get('Product', ''),
'object': content.get('Object', ''),
'date': content.get('Date', ''),
'logic': content.get('Logic', '')
})
# 创建DataFrame并保存
df = pd.DataFrame(rows)
distribution_csv = os.path.join(output_dir, f"distribution_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv")
df.to_csv(distribution_csv, index=False)
logger.info(f"分发CSV已保存到: {distribution_csv}")
return distribution_csv
except Exception as e:
logger.error(f"准备分发CSV失败: {e}")
return None
def create_zip_file(files, output_path):
"""将文件打包为ZIP"""
try:
os.makedirs(os.path.dirname(output_path), exist_ok=True)
with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for file_path in files:
if os.path.exists(file_path):
arcname = os.path.basename(file_path)
zipf.write(file_path, arcname=arcname)
else:
logger.warning(f"文件不存在,跳过: {file_path}")
logger.info(f"成功创建ZIP文件: {output_path}")
return output_path
except Exception as e:
logger.error(f"创建ZIP文件失败: {e}")
return None
def send_single_email(from_addr, password, to_addr, subject, content, attachments,
smtp_server, smtp_port, use_ssl=False):
"""发送单封邮件"""
try:
# 创建邮件对象
message = MIMEMultipart()
message['From'] = Header(from_addr)
message['To'] = Header(to_addr)
message['Subject'] = Header(subject)
# 添加正文
message.attach(MIMEText(content, 'plain', 'utf-8'))
# 添加附件
for attachment in attachments:
if not os.path.exists(attachment):
logger.warning(f"附件不存在,跳过: {attachment}")
continue
with open(attachment, 'rb') as f:
part = MIMEApplication(f.read())
part.add_header('Content-Disposition', 'attachment',
filename=Header(os.path.basename(attachment), 'utf-8').encode())
message.attach(part)
# 连接SMTP服务器并发送
if use_ssl:
server = smtplib.SMTP_SSL(smtp_server, smtp_port)
else:
server = smtplib.SMTP(smtp_server, smtp_port)
server.login(from_addr, password)
server.sendmail(from_addr, to_addr, message.as_string())
server.quit()
logger.info(f"成功发送邮件到: {to_addr}")
return True
except Exception as e:
logger.error(f"发送邮件到 {to_addr} 失败: {e}")
return False
def send_emails(distribution_csv, output_dir, email_from, email_password,
subject, smtp_server, smtp_port, use_ssl, test_mode, delay=2):
"""发送邮件"""
try:
# 确保输出目录存在
os.makedirs(output_dir, exist_ok=True)
os.makedirs(os.path.join(output_dir, "temp_zips"), exist_ok=True)
# 读取分发CSV
df = pd.read_csv(distribution_csv)
logger.info(f"{distribution_csv} 读取了 {len(df)} 条分发记录")
# 按邮箱分组
email_groups = {}
for _, row in df.iterrows():
email = row['email']
if email not in email_groups:
email_groups[email] = []
email_groups[email].append(row)
logger.info(f"共有 {len(email_groups)} 个邮箱需要发送")
# 结果记录
results = {
"total": len(email_groups),
"success": 0,
"failed": 0,
"details": []
}
# 临时ZIP文件列表
temp_zips = []
# 发送邮件
for email, rows in email_groups.items():
try:
logger.info(f"处理邮箱: {email}")
# 收集所有文件路径
files = []
for row in rows:
file_path = row.get('file_path')
if file_path and os.path.exists(file_path):
files.append(file_path)
poster_path = row.get('poster_path')
if poster_path and os.path.exists(poster_path):
files.append(poster_path)
if not files:
logger.warning(f"邮箱 {email} 没有有效的附件文件,跳过")
results["details"].append({
"email": email,
"status": "skipped",
"reason": "没有有效的附件文件",
"files": []
})
results["failed"] += 1
continue
# 创建ZIP文件
zip_filename = f"content_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{hash(email) % 10000}.zip"
zip_path = os.path.join(output_dir, "temp_zips", zip_filename)
zip_file = create_zip_file(files, zip_path)
if zip_file:
temp_zips.append(zip_file)
else:
logger.error(f"为邮箱 {email} 创建ZIP文件失败跳过")
results["details"].append({
"email": email,
"status": "failed",
"reason": "创建ZIP文件失败",
"files": files
})
results["failed"] += 1
continue
# 构建邮件内容
files_count = len(files)
entries_count = len(rows)
email_content = f"""尊敬的用户:
您好附件中是您的旅游内容创作成果请查收
本次发送了 {entries_count} 篇文章 {files_count} 个文件
祝好
"""
# 发送邮件
if test_mode:
logger.info(f"测试模式: 模拟发送邮件到 {email}")
results["details"].append({
"email": email,
"status": "success (test mode)",
"files": files
})
results["success"] += 1
else:
success = send_single_email(
email_from,
email_password,
email,
subject,
email_content,
[zip_file],
smtp_server,
smtp_port,
use_ssl
)
if success:
results["success"] += 1
results["details"].append({
"email": email,
"status": "success",
"files": files
})
else:
results["failed"] += 1
results["details"].append({
"email": email,
"status": "failed",
"reason": "SMTP发送失败",
"files": files
})
# 添加延迟,避免发送过快
if not test_mode and delay > 0:
logger.info(f"等待 {delay} 秒后继续发送下一封邮件...")
time.sleep(delay)
except Exception as e:
logger.error(f"处理邮箱 {email} 时出错: {e}")
results["failed"] += 1
results["details"].append({
"email": email,
"status": "failed",
"reason": str(e),
"files": []
})
# 打印发送结果摘要
logger.info("\n===== 邮件发送结果摘要 =====")
logger.info(f"总计: {results['total']} 个邮箱")
logger.info(f"成功: {results['success']} 个邮箱")
logger.info(f"失败: {results['failed']} 个邮箱")
# 清理临时文件
if not test_mode:
logger.info("清理临时ZIP文件...")
for temp_zip in temp_zips:
try:
if os.path.exists(temp_zip):
os.remove(temp_zip)
logger.info(f"已删除临时文件: {temp_zip}")
except Exception as e:
logger.warning(f"删除临时文件失败: {temp_zip}, 错误: {e}")
# 将结果写入CSV文件
try:
result_df = pd.DataFrame(results["details"])
result_csv = os.path.join(output_dir,
f"email_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv")
result_df.to_csv(result_csv, index=False)
logger.info(f"发送结果已保存到: {result_csv}")
return result_csv
except Exception as e:
logger.error(f"保存结果到CSV失败: {e}")
return None
except Exception as e:
logger.error(f"发送邮件过程中发生错误: {e}")
return None
def generate_distribution_report(distribution_csv, send_result_csv, manifest_csv, output_dir):
"""生成分发报告"""
try:
# 读取分发CSV
dist_df = pd.read_csv(distribution_csv)
# 读取发送结果CSV
if send_result_csv and os.path.exists(send_result_csv):
result_df = pd.read_csv(send_result_csv)
# 添加发送状态
email_status = {}
for _, row in result_df.iterrows():
email_status[row['email']] = row['status']
dist_df['send_status'] = dist_df['email'].map(email_status)
else:
dist_df['send_status'] = 'unknown'
# 统计文章分发情况
article_stats = {}
for _, row in dist_df.iterrows():
entry_id = row['entry_id']
if entry_id not in article_stats:
article_stats[entry_id] = {
'total_assigned': 0,
'sent_success': 0,
'sent_failed': 0,
'sent_unknown': 0,
'product': row.get('product', ''),
'object': row.get('object', ''),
'date': row.get('date', ''),
'logic': row.get('logic', '')
}
article_stats[entry_id]['total_assigned'] += 1
if 'success' in str(row['send_status']).lower():
article_stats[entry_id]['sent_success'] += 1
elif row['send_status'] in ['failed', 'skipped']:
article_stats[entry_id]['sent_failed'] += 1
else:
article_stats[entry_id]['sent_unknown'] += 1
# 读取原始清单
manifest_df = pd.read_csv(manifest_csv)
# 创建带分发状态的清单
manifest_with_dist = manifest_df.copy()
manifest_with_dist['assigned_count'] = manifest_with_dist['EntryID'].map(
{k: v['total_assigned'] for k, v in article_stats.items()})
manifest_with_dist['sent_success'] = manifest_with_dist['EntryID'].map(
{k: v['sent_success'] for k, v in article_stats.items()})
# 填充NaN值
manifest_with_dist['assigned_count'] = manifest_with_dist['assigned_count'].fillna(0).astype(int)
manifest_with_dist['sent_success'] = manifest_with_dist['sent_success'].fillna(0).astype(int)
# 添加是否被分发的标记
manifest_with_dist['is_distributed'] = manifest_with_dist['assigned_count'] > 0
# 保存报告
report_csv = os.path.join(output_dir, f"distribution_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv")
manifest_with_dist.to_csv(report_csv, index=False)
# 创建简洁的分发摘要报告
summary_data = []
for entry_id, stats in article_stats.items():
summary_data.append({
'entry_id': entry_id,
'product': stats['product'],
'object': stats['object'],
'date': stats['date'],
'logic': stats['logic'],
'assigned': stats['total_assigned'],
'success': stats['sent_success'],
'failed': stats['sent_failed'],
'unknown': stats['sent_unknown']
})
if summary_data:
summary_df = pd.DataFrame(summary_data)
summary_csv = os.path.join(output_dir, f"distribution_summary_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv")
summary_df.to_csv(summary_csv, index=False)
logger.info(f"分发摘要报告已保存到: {summary_csv}")
# 保存文章统计
stats_json = os.path.join(output_dir, f"article_stats_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json")
with open(stats_json, 'w') as f:
json.dump(article_stats, f, indent=2)
# 统计摘要
total_articles = len(manifest_df)
distributed_articles = sum(1 for _, v in article_stats.items() if v['total_assigned'] > 0)
success_sent = sum(1 for _, v in article_stats.items() if v['sent_success'] > 0)
logger.info(f"分发报告已保存到: {report_csv}")
logger.info(f"文章统计已保存到: {stats_json}")
logger.info("\n===== 分发统计 =====")
logger.info(f"总文章数: {total_articles}")
logger.info(f"已分配文章数: {distributed_articles} ({distributed_articles/total_articles*100:.2f}%)")
logger.info(f"成功发送文章数: {success_sent} ({success_sent/total_articles*100:.2f}%)")
# 按产品统计
if 'Product' in manifest_df.columns:
product_stats = manifest_with_dist.groupby('Product')['is_distributed'].value_counts().unstack().fillna(0)
if not product_stats.empty:
logger.info("\n===== 按产品分发统计 =====")
for product, row in product_stats.iterrows():
if True in row:
distributed = row.get(True, 0)
total = row.sum()
logger.info(f"产品 '{product}': {distributed}/{total} ({distributed/total*100:.2f}%)")
return report_csv
except Exception as e:
logger.error(f"生成分发报告失败: {e}")
return None
def main():
args = parse_arguments()
# 创建输出目录
os.makedirs(args.output_dir, exist_ok=True)
# 读取用户CSV
users_df = read_user_csv(args.user_csv, args.email_column, args.username_column)
if users_df is None:
logger.error("无法处理用户CSV程序退出")
return
# 读取内容清单CSV
content_df = read_manifest_csv(args.manifest_csv, args.judge_only_success)
if content_df is None:
logger.error("无法处理内容清单CSV程序退出")
return
# 读取上一次分发结果(如果提供)
already_sent_ids = None
if args.previous_distribution:
already_sent_ids = read_previous_distribution(args.previous_distribution, args.skip_sent_success)
# 为用户分配内容
users_with_content = allocate_content_to_users(
users_df, content_df, args.article_per_user,
args.email_column, args.username_column, args.max_send_count,
already_sent_ids
)
if not users_with_content:
logger.error("内容分配失败,程序退出")
return
# 准备分发CSV
distribution_csv = prepare_distribution_csv(users_with_content, args.output_dir)
if not distribution_csv:
logger.error("准备分发CSV失败程序退出")
return
# 发送邮件
send_result_csv = send_emails(
distribution_csv, args.output_dir, args.email_from, args.email_password,
args.subject, args.smtp_server, args.smtp_port, args.use_ssl, args.test_mode,
args.delay
)
# 生成分发报告
generate_distribution_report(distribution_csv, send_result_csv, args.manifest_csv, args.output_dir)
logger.info("内容分发流程完成")
if __name__ == "__main__":
main()

76
scripts/distribute_example.sh Executable file
View File

@ -0,0 +1,76 @@
#!/bin/bash
# 设置时间戳变量
TIMESTAMP=$(date +"%Y%m%d_%H%M%S")
# 设置路径变量
BASE_DIR="/root/autodl-tmp/TravelContentCreator"
LOG_DIR="$BASE_DIR/log"
RESULT_DIR="$BASE_DIR/output/$TIMESTAMP"
OUTPUT_DIR="$RESULT_DIR/distribution_results"
# 设置其他变量
USER_CSV="$BASE_DIR/output/5.12 copy.csv"
MANIFEST_CSV="$BASE_DIR/output/2025-05-12_09-33-12/manifest_2025-05-12_09-33-12.csv"
EMAIL_FROM="zwysendemail@163.com"
EMAIL_PASSWORD="NMhVGFmCJkGEy3B5"
SUBJECT="您的旅游内容创作"
# 上一次分发结果文件(如果存在)
PREVIOUS_DIST="$BASE_DIR/distribution_results/distribution_summary_20250512_183328.csv"
# 创建必要的目录
mkdir -p "$LOG_DIR"
mkdir -p "$OUTPUT_DIR"
# 将日志同时输出到控制台和日志文件
LOG_FILE="$LOG_DIR/distribution_$TIMESTAMP.log"
exec > >(tee -a "$LOG_FILE") 2>&1
echo "开始执行分发脚本 - $(date)"
echo "日志保存在: $LOG_FILE"
echo "结果保存在: $RESULT_DIR"
# 测试模式运行
echo "在测试模式下运行,不会实际发送邮件..."
python scripts/distribute_content.py \
--user-csv "$USER_CSV" \
--manifest-csv "$MANIFEST_CSV" \
--output-dir "$OUTPUT_DIR" \
--email-from "$EMAIL_FROM" \
--email-password "$EMAIL_PASSWORD" \
--subject "$SUBJECT" \
--article-per-user 1 \
--judge-only-success \
--test-mode \
--previous-distribution "$PREVIOUS_DIST" \
--skip-sent-success
# 实际发送邮件的命令(取消注释以启用)
# echo "开始实际发送邮件..."
# python scripts/distribute_content.py \
# --user-csv "$USER_CSV" \
# --manifest-csv "$MANIFEST_CSV" \
# --output-dir "$OUTPUT_DIR" \
# --email-from "$EMAIL_FROM" \
# --email-password "$EMAIL_PASSWORD" \
# --subject "$SUBJECT" \
# --article-per-user 3 \
# --use-ssl \
# --smtp-port 465 \
# --judge-only-success \
# --max-send-count 10 \ # 限制最多发送给10个用户
# --previous-distribution "$PREVIOUS_DIST" \
# --skip-sent-success
# 不使用过滤功能的示例
# python scripts/distribute_content.py \
# --user-csv "$USER_CSV" \
# --manifest-csv "$MANIFEST_CSV" \
# --output-dir "$OUTPUT_DIR" \
# --email-from "$EMAIL_FROM" \
# --email-password "$EMAIL_PASSWORD" \
# --subject "$SUBJECT" \
# --article-per-user 3 \
# --judge-only-success
echo "脚本执行完成 - $(date)"