723 lines
30 KiB
Python
723 lines
30 KiB
Python
#!/usr/bin/env python
|
||
# -*- coding: utf-8 -*-
|
||
|
||
import os
|
||
import sys
|
||
import pandas as pd
|
||
import argparse
|
||
import random
|
||
import logging
|
||
import subprocess
|
||
from datetime import datetime
|
||
import json
|
||
import smtplib
|
||
import zipfile
|
||
import time
|
||
import glob
|
||
from email.mime.text import MIMEText
|
||
from email.mime.multipart import MIMEMultipart
|
||
from email.mime.application import MIMEApplication
|
||
from email.header import Header
|
||
|
||
# 配置日志
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||
handlers=[
|
||
logging.FileHandler(f"content_distribution_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"),
|
||
logging.StreamHandler()
|
||
]
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
def parse_arguments():
|
||
"""解析命令行参数"""
|
||
parser = argparse.ArgumentParser(description='内容分发')
|
||
|
||
# 必需参数
|
||
parser.add_argument('--user-csv', type=str, required=True,
|
||
help='小红书用户CSV文件路径')
|
||
parser.add_argument('--manifest-csv', type=str, required=True,
|
||
help='内容清单CSV文件路径')
|
||
parser.add_argument('--output-dir', type=str, default='distribution_results',
|
||
help='分发结果输出目录')
|
||
parser.add_argument('--email-from', type=str, required=True,
|
||
help='发件人邮箱地址')
|
||
parser.add_argument('--email-password', type=str, required=True,
|
||
help='发件人邮箱授权码')
|
||
|
||
# 可选参数
|
||
parser.add_argument('--article-per-user', type=int, default=3,
|
||
help='每个用户分配的文章数量')
|
||
parser.add_argument('--max-send-count', type=int, default=None,
|
||
help='最大发送数量限制')
|
||
parser.add_argument('--subject', type=str, default='您的旅游内容创作已就绪',
|
||
help='邮件主题')
|
||
parser.add_argument('--smtp-server', type=str, default='smtp.163.com',
|
||
help='SMTP服务器地址')
|
||
parser.add_argument('--smtp-port', type=int, default=25,
|
||
help='SMTP服务器端口')
|
||
parser.add_argument('--use-ssl', action='store_true',
|
||
help='使用SSL连接SMTP服务器')
|
||
parser.add_argument('--email-column', type=str, default='达人邮箱',
|
||
help='用户CSV中邮箱列的名称')
|
||
parser.add_argument('--username-column', type=str, default='小红书ID',
|
||
help='用户CSV中用户名列的名称')
|
||
parser.add_argument('--judge-only-success', action='store_true',
|
||
help='只分发审核成功的内容')
|
||
parser.add_argument('--test-mode', action='store_true',
|
||
help='测试模式,不实际发送邮件')
|
||
parser.add_argument('--delay', type=int, default=2,
|
||
help='每封邮件发送之间的延迟时间(秒)')
|
||
parser.add_argument('--previous-distribution', type=str, default=None,
|
||
help='上一次分发结果CSV或报告文件路径,用于避免重复发送')
|
||
parser.add_argument('--skip-sent-success', action='store_true',
|
||
help='跳过上次成功发送的文章')
|
||
parser.add_argument('--zip-filename', type=str, default=None,
|
||
help='指定ZIP压缩包的基本文件名(不含扩展名),如"文旅小红书带货笔记内容0512"')
|
||
|
||
return parser.parse_args()
|
||
|
||
def find_additional_images(file_path):
|
||
"""查找与文章相关的所有额外图片"""
|
||
if not file_path or pd.isna(file_path) or not os.path.exists(file_path):
|
||
return []
|
||
|
||
# 获取文章目录
|
||
article_dir = os.path.dirname(file_path)
|
||
|
||
# 查找可能的额外图片目录
|
||
additional_images_dir = os.path.join(article_dir, "additional_images")
|
||
if not os.path.exists(additional_images_dir):
|
||
# 尝试向上一级寻找
|
||
parent_dir = os.path.dirname(article_dir)
|
||
additional_images_dir = os.path.join(parent_dir, "additional_images")
|
||
if not os.path.exists(additional_images_dir):
|
||
return []
|
||
|
||
# 查找所有图片文件
|
||
image_patterns = ["*.jpg", "*.jpeg", "*.png", "*.gif", "*.webp"]
|
||
image_files = []
|
||
|
||
for pattern in image_patterns:
|
||
image_files.extend(glob.glob(os.path.join(additional_images_dir, pattern)))
|
||
|
||
logger.info(f"找到 {len(image_files)} 张额外图片: {additional_images_dir}")
|
||
return image_files
|
||
|
||
def read_user_csv(user_csv_path, email_column, username_column):
|
||
"""读取用户CSV文件"""
|
||
try:
|
||
df = pd.read_csv(user_csv_path)
|
||
|
||
# 检查必要的列是否存在
|
||
if email_column not in df.columns:
|
||
logger.error(f"用户CSV中缺少邮箱列 '{email_column}'")
|
||
return None
|
||
|
||
# 过滤有效邮箱(非空且包含@符号)
|
||
df = df[df[email_column].notna()]
|
||
df = df[df[email_column].astype(str).str.contains('@')]
|
||
|
||
# 获取用户名列,如果不存在则创建默认用户名
|
||
if username_column not in df.columns:
|
||
logger.warning(f"用户CSV中缺少用户名列 '{username_column}',使用邮箱前缀作为用户名")
|
||
df[username_column] = df[email_column].apply(lambda x: x.split('@')[0])
|
||
|
||
logger.info(f"成功读取 {len(df)} 个有效用户")
|
||
return df
|
||
except Exception as e:
|
||
logger.error(f"读取用户CSV失败: {e}")
|
||
return None
|
||
|
||
def read_manifest_csv(manifest_csv_path, judge_only_success=False):
|
||
"""读取内容清单CSV文件"""
|
||
try:
|
||
df = pd.read_csv(manifest_csv_path)
|
||
|
||
# 检查必要的列是否存在
|
||
required_columns = ['OutputTxtPath', 'PosterPath', 'ArticleJsonPath']
|
||
missing_columns = [col for col in required_columns if col not in df.columns]
|
||
if missing_columns:
|
||
logger.error(f"内容清单CSV中缺少必要列: {', '.join(missing_columns)}")
|
||
return None
|
||
|
||
# 过滤审核成功的内容
|
||
if judge_only_success and 'JudgeStatus' in df.columns:
|
||
original_count = len(df)
|
||
df = df[df['JudgeStatus'] == True]
|
||
logger.info(f"只保留审核成功的内容,从 {original_count} 条过滤为 {len(df)} 条")
|
||
|
||
logger.info(f"成功读取 {len(df)} 条内容")
|
||
return df
|
||
except Exception as e:
|
||
logger.error(f"读取内容清单CSV失败: {e}")
|
||
return None
|
||
|
||
def read_previous_distribution(previous_file, skip_sent_success=True):
|
||
"""读取上一次的分发结果,获取已发放过的文章ID"""
|
||
if not previous_file or not os.path.exists(previous_file):
|
||
logger.info("未提供上一次分发结果文件或文件不存在,将不过滤已发放文章")
|
||
return set()
|
||
|
||
try:
|
||
df = pd.read_csv(previous_file)
|
||
|
||
# 确定文件类型并提取相关列
|
||
if 'entry_id' in df.columns: # distribution.csv 或 email_results.csv
|
||
if 'send_status' in df.columns and skip_sent_success:
|
||
# 只过滤成功发送的
|
||
already_sent = df[df['send_status'].str.contains('success', na=False)]['entry_id'].unique()
|
||
else:
|
||
# 过滤所有分配过的
|
||
already_sent = df['entry_id'].unique()
|
||
elif 'EntryID' in df.columns: # manifest_with_dist.csv
|
||
if 'sent_success' in df.columns and skip_sent_success:
|
||
# 只过滤成功发送的
|
||
already_sent = df[df['sent_success'] > 0]['EntryID'].unique()
|
||
else:
|
||
# 过滤所有分配过的
|
||
already_sent = df[df['assigned_count'] > 0]['EntryID'].unique()
|
||
elif 'entry_id' in df.columns: # distribution_summary.csv
|
||
if 'success' in df.columns and skip_sent_success:
|
||
# 只过滤成功发送的
|
||
already_sent = df[df['success'] > 0]['entry_id'].unique()
|
||
else:
|
||
# 过滤所有分配过的
|
||
already_sent = df['entry_id'].unique()
|
||
else:
|
||
logger.warning(f"无法识别的分发结果文件格式: {previous_file}")
|
||
return set()
|
||
|
||
already_sent_set = set(already_sent)
|
||
logger.info(f"从上一次分发结果中找到 {len(already_sent_set)} 篇已发放文章")
|
||
return already_sent_set
|
||
|
||
except Exception as e:
|
||
logger.error(f"读取上一次分发结果失败: {e}")
|
||
return set()
|
||
|
||
def allocate_content_to_users(users_df, content_df, article_per_user,
|
||
email_column, username_column, max_send_count=None,
|
||
already_sent_ids=None):
|
||
"""为用户分配内容"""
|
||
try:
|
||
# 创建用户列表
|
||
users = []
|
||
for _, row in users_df.iterrows():
|
||
email = row[email_column]
|
||
username = row[username_column] if not pd.isna(row[username_column]) else f"用户_{_}"
|
||
users.append({
|
||
'email': email,
|
||
'username': username,
|
||
'contents': []
|
||
})
|
||
|
||
# 转换为记录列表并过滤已发放的文章
|
||
content_list = content_df.to_dict('records')
|
||
if already_sent_ids:
|
||
original_count = len(content_list)
|
||
content_list = [c for c in content_list if c['EntryID'] not in already_sent_ids]
|
||
filtered_count = original_count - len(content_list)
|
||
logger.info(f"过滤掉 {filtered_count} 篇已发放文章,剩余 {len(content_list)} 篇可用")
|
||
|
||
if not content_list:
|
||
logger.warning("过滤后没有可用文章,请提供新内容或关闭过滤功能")
|
||
return []
|
||
|
||
# 随机打乱内容列表
|
||
random.shuffle(content_list)
|
||
|
||
# 为每个用户分配内容
|
||
content_allocated = []
|
||
content_index = 0
|
||
|
||
# 限制最大发送数量
|
||
if max_send_count is not None and max_send_count > 0:
|
||
users = users[:max_send_count]
|
||
logger.info(f"限制发送用户数量为 {max_send_count}")
|
||
|
||
for user in users:
|
||
user_contents = []
|
||
for _ in range(article_per_user):
|
||
if content_index >= len(content_list):
|
||
content_index = 0 # 如果内容不够,循环使用
|
||
logger.warning("内容不足,将循环使用现有内容")
|
||
|
||
content = content_list[content_index]
|
||
user_contents.append(content)
|
||
content_allocated.append(content)
|
||
content_index += 1
|
||
|
||
user['contents'] = user_contents
|
||
|
||
logger.info(f"已为 {len(users)} 个用户分配 {len(content_allocated)} 条内容")
|
||
unique_content_count = len(set([c['EntryID'] for c in content_allocated]))
|
||
logger.info(f"分配的唯一内容条数: {unique_content_count}")
|
||
|
||
return users
|
||
except Exception as e:
|
||
logger.error(f"分配内容失败: {e}")
|
||
return []
|
||
|
||
def prepare_distribution_csv(users_with_content, output_dir):
|
||
"""准备分发CSV文件"""
|
||
try:
|
||
os.makedirs(output_dir, exist_ok=True)
|
||
|
||
# 准备CSV数据
|
||
rows = []
|
||
for user in users_with_content:
|
||
for content in user['contents']:
|
||
# 查找额外图片
|
||
output_txt_path = content['OutputTxtPath']
|
||
additional_images = find_additional_images(output_txt_path)
|
||
|
||
rows.append({
|
||
'email': user['email'],
|
||
'username': user['username'],
|
||
'file_path': content['OutputTxtPath'],
|
||
'poster_path': content['PosterPath'],
|
||
'article_json_path': content['ArticleJsonPath'],
|
||
'additional_images': ';'.join(additional_images), # 保存为分号分隔的字符串
|
||
'entry_id': content['EntryID'],
|
||
'topic_index': content.get('TopicIndex', ''),
|
||
'variant_index': content.get('VariantIndex', ''),
|
||
'product': content.get('Product', ''),
|
||
'object': content.get('Object', ''),
|
||
'date': content.get('Date', ''),
|
||
'logic': content.get('Logic', '')
|
||
})
|
||
|
||
# 创建DataFrame并保存
|
||
df = pd.DataFrame(rows)
|
||
distribution_csv = os.path.join(output_dir, f"distribution_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv")
|
||
df.to_csv(distribution_csv, index=False)
|
||
|
||
logger.info(f"分发CSV已保存到: {distribution_csv}")
|
||
return distribution_csv
|
||
except Exception as e:
|
||
logger.error(f"准备分发CSV失败: {e}")
|
||
return None
|
||
|
||
def create_zip_file(files, output_path):
|
||
"""将文件打包为ZIP"""
|
||
try:
|
||
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
||
with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
|
||
for file_path in files:
|
||
if not file_path or pd.isna(file_path):
|
||
continue
|
||
|
||
if os.path.exists(file_path):
|
||
arcname = os.path.basename(file_path)
|
||
zipf.write(file_path, arcname=arcname)
|
||
else:
|
||
logger.warning(f"文件不存在,跳过: {file_path}")
|
||
|
||
logger.info(f"成功创建ZIP文件: {output_path}")
|
||
return output_path
|
||
except Exception as e:
|
||
logger.error(f"创建ZIP文件失败: {e}")
|
||
return None
|
||
|
||
def send_single_email(from_addr, password, to_addr, subject, content, attachments,
|
||
smtp_server, smtp_port, use_ssl=False):
|
||
"""发送单封邮件"""
|
||
try:
|
||
# 创建邮件对象
|
||
message = MIMEMultipart()
|
||
message['From'] = Header(from_addr)
|
||
message['To'] = Header(to_addr)
|
||
message['Subject'] = Header(subject)
|
||
|
||
# 添加正文
|
||
message.attach(MIMEText(content, 'plain', 'utf-8'))
|
||
|
||
# 添加附件
|
||
for attachment in attachments:
|
||
if not os.path.exists(attachment):
|
||
logger.warning(f"附件不存在,跳过: {attachment}")
|
||
continue
|
||
|
||
with open(attachment, 'rb') as f:
|
||
part = MIMEApplication(f.read())
|
||
part.add_header('Content-Disposition', 'attachment',
|
||
filename=Header(os.path.basename(attachment), 'utf-8').encode())
|
||
message.attach(part)
|
||
|
||
# 连接SMTP服务器并发送
|
||
if use_ssl:
|
||
server = smtplib.SMTP_SSL(smtp_server, smtp_port)
|
||
else:
|
||
server = smtplib.SMTP(smtp_server, smtp_port)
|
||
|
||
server.login(from_addr, password)
|
||
server.sendmail(from_addr, to_addr, message.as_string())
|
||
server.quit()
|
||
|
||
logger.info(f"成功发送邮件到: {to_addr}")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"发送邮件到 {to_addr} 失败: {e}")
|
||
return False
|
||
|
||
def send_emails(distribution_csv, output_dir, email_from, email_password,
|
||
subject, smtp_server, smtp_port, use_ssl, test_mode, delay=2, zip_filename=None):
|
||
"""发送邮件"""
|
||
try:
|
||
# 确保输出目录存在
|
||
os.makedirs(output_dir, exist_ok=True)
|
||
os.makedirs(os.path.join(output_dir, "temp_zips"), exist_ok=True)
|
||
|
||
# 读取分发CSV
|
||
df = pd.read_csv(distribution_csv)
|
||
logger.info(f"从 {distribution_csv} 读取了 {len(df)} 条分发记录")
|
||
|
||
# 按邮箱分组
|
||
email_groups = {}
|
||
for _, row in df.iterrows():
|
||
email = row['email']
|
||
if email not in email_groups:
|
||
email_groups[email] = []
|
||
email_groups[email].append(row)
|
||
|
||
logger.info(f"共有 {len(email_groups)} 个邮箱需要发送")
|
||
|
||
# 结果记录
|
||
results = {
|
||
"total": len(email_groups),
|
||
"success": 0,
|
||
"failed": 0,
|
||
"details": []
|
||
}
|
||
|
||
# 临时ZIP文件列表
|
||
temp_zips = []
|
||
|
||
# 发送邮件
|
||
for email, rows in email_groups.items():
|
||
try:
|
||
logger.info(f"处理邮箱: {email}")
|
||
|
||
# 收集所有文件路径
|
||
files = []
|
||
for row in rows:
|
||
# 收集文本文件
|
||
file_path = row.get('file_path')
|
||
if file_path and not pd.isna(file_path) and os.path.exists(file_path):
|
||
files.append(file_path)
|
||
|
||
# 收集海报图片
|
||
poster_path = row.get('poster_path')
|
||
if poster_path and not pd.isna(poster_path) and os.path.exists(poster_path):
|
||
files.append(poster_path)
|
||
|
||
# 收集所有额外图片
|
||
additional_images = row.get('additional_images', '')
|
||
if additional_images and not pd.isna(additional_images):
|
||
for img_path in additional_images.split(';'):
|
||
if img_path.strip() and os.path.exists(img_path):
|
||
files.append(img_path)
|
||
|
||
if not files:
|
||
logger.warning(f"邮箱 {email} 没有有效的附件文件,跳过")
|
||
results["details"].append({
|
||
"email": email,
|
||
"status": "skipped",
|
||
"reason": "没有有效的附件文件",
|
||
"files": []
|
||
})
|
||
results["failed"] += 1
|
||
continue
|
||
|
||
# 创建ZIP文件
|
||
if zip_filename:
|
||
# 使用指定的文件名
|
||
zip_filename_with_ext = f"{zip_filename}.zip"
|
||
else:
|
||
# 使用默认的时间戳文件名
|
||
zip_filename_with_ext = f"content_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{hash(email) % 10000}.zip"
|
||
|
||
zip_path = os.path.join(output_dir, "temp_zips", zip_filename_with_ext)
|
||
|
||
zip_file = create_zip_file(files, zip_path)
|
||
if zip_file:
|
||
temp_zips.append(zip_file)
|
||
else:
|
||
logger.error(f"为邮箱 {email} 创建ZIP文件失败,跳过")
|
||
results["details"].append({
|
||
"email": email,
|
||
"status": "failed",
|
||
"reason": "创建ZIP文件失败",
|
||
"files": files
|
||
})
|
||
results["failed"] += 1
|
||
continue
|
||
|
||
# 构建邮件内容
|
||
files_count = len(files)
|
||
entries_count = len(rows)
|
||
|
||
email_content = f"""您好!请查收今日带货笔记(文案+配图),内容在文件压缩包内。具体挂载商品等操作流程请查看对应达人微信群内信息。
|
||
|
||
"""
|
||
|
||
# 发送邮件
|
||
if test_mode:
|
||
logger.info(f"测试模式: 模拟发送邮件到 {email}")
|
||
results["details"].append({
|
||
"email": email,
|
||
"status": "success (test mode)",
|
||
"files": files
|
||
})
|
||
results["success"] += 1
|
||
else:
|
||
success = send_single_email(
|
||
email_from,
|
||
email_password,
|
||
email,
|
||
subject,
|
||
email_content,
|
||
[zip_file],
|
||
smtp_server,
|
||
smtp_port,
|
||
use_ssl
|
||
)
|
||
|
||
if success:
|
||
results["success"] += 1
|
||
results["details"].append({
|
||
"email": email,
|
||
"status": "success",
|
||
"files": files
|
||
})
|
||
else:
|
||
results["failed"] += 1
|
||
results["details"].append({
|
||
"email": email,
|
||
"status": "failed",
|
||
"reason": "SMTP发送失败",
|
||
"files": files
|
||
})
|
||
|
||
# 添加延迟,避免发送过快
|
||
if not test_mode and delay > 0:
|
||
logger.info(f"等待 {delay} 秒后继续发送下一封邮件...")
|
||
time.sleep(delay)
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理邮箱 {email} 时出错: {e}")
|
||
results["failed"] += 1
|
||
results["details"].append({
|
||
"email": email,
|
||
"status": "failed",
|
||
"reason": str(e),
|
||
"files": []
|
||
})
|
||
|
||
# 打印发送结果摘要
|
||
logger.info("\n===== 邮件发送结果摘要 =====")
|
||
logger.info(f"总计: {results['total']} 个邮箱")
|
||
logger.info(f"成功: {results['success']} 个邮箱")
|
||
logger.info(f"失败: {results['failed']} 个邮箱")
|
||
|
||
# 清理临时文件
|
||
if not test_mode:
|
||
logger.info("清理临时ZIP文件...")
|
||
for temp_zip in temp_zips:
|
||
try:
|
||
if os.path.exists(temp_zip):
|
||
os.remove(temp_zip)
|
||
logger.info(f"已删除临时文件: {temp_zip}")
|
||
except Exception as e:
|
||
logger.warning(f"删除临时文件失败: {temp_zip}, 错误: {e}")
|
||
|
||
# 将结果写入CSV文件
|
||
try:
|
||
result_df = pd.DataFrame(results["details"])
|
||
result_csv = os.path.join(output_dir,
|
||
f"email_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv")
|
||
result_df.to_csv(result_csv, index=False)
|
||
logger.info(f"发送结果已保存到: {result_csv}")
|
||
return result_csv
|
||
except Exception as e:
|
||
logger.error(f"保存结果到CSV失败: {e}")
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.error(f"发送邮件过程中发生错误: {e}")
|
||
return None
|
||
|
||
def generate_distribution_report(distribution_csv, send_result_csv, manifest_csv, output_dir):
|
||
"""生成分发报告"""
|
||
try:
|
||
# 读取分发CSV
|
||
dist_df = pd.read_csv(distribution_csv)
|
||
|
||
# 读取发送结果CSV
|
||
if send_result_csv and os.path.exists(send_result_csv):
|
||
result_df = pd.read_csv(send_result_csv)
|
||
# 添加发送状态
|
||
email_status = {}
|
||
for _, row in result_df.iterrows():
|
||
email_status[row['email']] = row['status']
|
||
|
||
dist_df['send_status'] = dist_df['email'].map(email_status)
|
||
else:
|
||
dist_df['send_status'] = 'unknown'
|
||
|
||
# 统计文章分发情况
|
||
article_stats = {}
|
||
for _, row in dist_df.iterrows():
|
||
entry_id = row['entry_id']
|
||
if entry_id not in article_stats:
|
||
article_stats[entry_id] = {
|
||
'total_assigned': 0,
|
||
'sent_success': 0,
|
||
'sent_failed': 0,
|
||
'sent_unknown': 0,
|
||
'product': row.get('product', ''),
|
||
'object': row.get('object', ''),
|
||
'date': row.get('date', ''),
|
||
'logic': row.get('logic', '')
|
||
}
|
||
|
||
article_stats[entry_id]['total_assigned'] += 1
|
||
|
||
if 'success' in str(row['send_status']).lower():
|
||
article_stats[entry_id]['sent_success'] += 1
|
||
elif row['send_status'] in ['failed', 'skipped']:
|
||
article_stats[entry_id]['sent_failed'] += 1
|
||
else:
|
||
article_stats[entry_id]['sent_unknown'] += 1
|
||
|
||
# 读取原始清单
|
||
manifest_df = pd.read_csv(manifest_csv)
|
||
|
||
# 创建带分发状态的清单
|
||
manifest_with_dist = manifest_df.copy()
|
||
manifest_with_dist['assigned_count'] = manifest_with_dist['EntryID'].map(
|
||
{k: v['total_assigned'] for k, v in article_stats.items()})
|
||
manifest_with_dist['sent_success'] = manifest_with_dist['EntryID'].map(
|
||
{k: v['sent_success'] for k, v in article_stats.items()})
|
||
|
||
# 填充NaN值
|
||
manifest_with_dist['assigned_count'] = manifest_with_dist['assigned_count'].fillna(0).astype(int)
|
||
manifest_with_dist['sent_success'] = manifest_with_dist['sent_success'].fillna(0).astype(int)
|
||
|
||
# 添加是否被分发的标记
|
||
manifest_with_dist['is_distributed'] = manifest_with_dist['assigned_count'] > 0
|
||
|
||
# 保存报告
|
||
report_csv = os.path.join(output_dir, f"distribution_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv")
|
||
manifest_with_dist.to_csv(report_csv, index=False)
|
||
|
||
# 创建简洁的分发摘要报告
|
||
summary_data = []
|
||
for entry_id, stats in article_stats.items():
|
||
summary_data.append({
|
||
'entry_id': entry_id,
|
||
'product': stats['product'],
|
||
'object': stats['object'],
|
||
'date': stats['date'],
|
||
'logic': stats['logic'],
|
||
'assigned': stats['total_assigned'],
|
||
'success': stats['sent_success'],
|
||
'failed': stats['sent_failed'],
|
||
'unknown': stats['sent_unknown']
|
||
})
|
||
|
||
if summary_data:
|
||
summary_df = pd.DataFrame(summary_data)
|
||
summary_csv = os.path.join(output_dir, f"distribution_summary_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv")
|
||
summary_df.to_csv(summary_csv, index=False)
|
||
logger.info(f"分发摘要报告已保存到: {summary_csv}")
|
||
|
||
# 保存文章统计
|
||
stats_json = os.path.join(output_dir, f"article_stats_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json")
|
||
with open(stats_json, 'w') as f:
|
||
json.dump(article_stats, f, indent=2)
|
||
|
||
# 统计摘要
|
||
total_articles = len(manifest_df)
|
||
distributed_articles = sum(1 for _, v in article_stats.items() if v['total_assigned'] > 0)
|
||
success_sent = sum(1 for _, v in article_stats.items() if v['sent_success'] > 0)
|
||
|
||
logger.info(f"分发报告已保存到: {report_csv}")
|
||
logger.info(f"文章统计已保存到: {stats_json}")
|
||
logger.info("\n===== 分发统计 =====")
|
||
logger.info(f"总文章数: {total_articles}")
|
||
logger.info(f"已分配文章数: {distributed_articles} ({distributed_articles/total_articles*100:.2f}%)")
|
||
logger.info(f"成功发送文章数: {success_sent} ({success_sent/total_articles*100:.2f}%)")
|
||
|
||
# 按产品统计
|
||
if 'Product' in manifest_df.columns:
|
||
product_stats = manifest_with_dist.groupby('Product')['is_distributed'].value_counts().unstack().fillna(0)
|
||
if not product_stats.empty:
|
||
logger.info("\n===== 按产品分发统计 =====")
|
||
for product, row in product_stats.iterrows():
|
||
if True in row:
|
||
distributed = row.get(True, 0)
|
||
total = row.sum()
|
||
logger.info(f"产品 '{product}': {distributed}/{total} ({distributed/total*100:.2f}%)")
|
||
|
||
return report_csv
|
||
except Exception as e:
|
||
logger.error(f"生成分发报告失败: {e}")
|
||
return None
|
||
|
||
def main():
|
||
args = parse_arguments()
|
||
|
||
# 创建输出目录
|
||
os.makedirs(args.output_dir, exist_ok=True)
|
||
|
||
# 读取用户CSV
|
||
users_df = read_user_csv(args.user_csv, args.email_column, args.username_column)
|
||
if users_df is None:
|
||
logger.error("无法处理用户CSV,程序退出")
|
||
return
|
||
|
||
# 读取内容清单CSV
|
||
content_df = read_manifest_csv(args.manifest_csv, args.judge_only_success)
|
||
if content_df is None:
|
||
logger.error("无法处理内容清单CSV,程序退出")
|
||
return
|
||
|
||
# 读取上一次分发结果(如果提供)
|
||
already_sent_ids = None
|
||
if args.previous_distribution:
|
||
already_sent_ids = read_previous_distribution(args.previous_distribution, args.skip_sent_success)
|
||
|
||
# 为用户分配内容
|
||
users_with_content = allocate_content_to_users(
|
||
users_df, content_df, args.article_per_user,
|
||
args.email_column, args.username_column, args.max_send_count,
|
||
already_sent_ids
|
||
)
|
||
|
||
if not users_with_content:
|
||
logger.error("内容分配失败,程序退出")
|
||
return
|
||
|
||
# 准备分发CSV
|
||
distribution_csv = prepare_distribution_csv(users_with_content, args.output_dir)
|
||
if not distribution_csv:
|
||
logger.error("准备分发CSV失败,程序退出")
|
||
return
|
||
|
||
# 发送邮件
|
||
send_result_csv = send_emails(
|
||
distribution_csv, args.output_dir, args.email_from, args.email_password,
|
||
args.subject, args.smtp_server, args.smtp_port, args.use_ssl, args.test_mode,
|
||
args.delay, args.zip_filename
|
||
)
|
||
|
||
# 生成分发报告
|
||
generate_distribution_report(distribution_csv, send_result_csv, args.manifest_csv, args.output_dir)
|
||
|
||
logger.info("内容分发流程完成")
|
||
|
||
if __name__ == "__main__":
|
||
main() |