TravelContentCreator/scripts/distribution/distribute_content.py

866 lines
33 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import pandas as pd
import argparse
import random
import logging
import sqlite3
from datetime import datetime
import json
import smtplib
import zipfile
import time
import glob
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
from email.header import Header
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(f"content_distribution_db_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def parse_arguments():
"""解析命令行参数"""
parser = argparse.ArgumentParser(description='内容分发 (数据库版)')
# 必需参数
parser.add_argument('--output-dir', type=str, default='distribution_results',
help='分发结果输出目录')
parser.add_argument('--email-from', type=str, required=True,
help='发件人邮箱地址')
parser.add_argument('--email-password', type=str, required=True,
help='发件人邮箱授权码')
# 数据库相关参数
parser.add_argument('--db-path', type=str, default='/root/autodl-tmp/TravelContentCreator/distribution.db',
help='数据库文件路径')
# 内容筛选参数
parser.add_argument('--product', type=str, default=None,
help='指定产品名称进行筛选')
parser.add_argument('--object', type=str, default=None,
help='指定景点名称进行筛选')
parser.add_argument('--undistributed-only', action='store_true',
help='只选择未分发的内容')
# 用户筛选参数
parser.add_argument('--user-id', type=int, default=None,
help='指定用户ID进行分发')
parser.add_argument('--user-email', type=str, default=None,
help='指定用户邮箱进行分发')
parser.add_argument('--max-users', type=int, default=None,
help='最大用户数量')
# 可选参数
parser.add_argument('--article-per-user', type=int, default=3,
help='每个用户分配的文章数量')
parser.add_argument('--subject', type=str, default='您的旅游内容创作已就绪',
help='邮件主题')
parser.add_argument('--smtp-server', type=str, default='smtp.163.com',
help='SMTP服务器地址')
parser.add_argument('--smtp-port', type=int, default=25,
help='SMTP服务器端口')
parser.add_argument('--use-ssl', action='store_true',
help='使用SSL连接SMTP服务器')
parser.add_argument('--judge-only-success', action='store_true',
help='只分发审核成功的内容')
parser.add_argument('--test-mode', action='store_true',
help='测试模式,不实际发送邮件')
parser.add_argument('--delay', type=int, default=2,
help='每封邮件发送之间的延迟时间(秒)')
parser.add_argument('--zip-filename', type=str, default=None,
help='指定ZIP压缩包的基本文件名不含扩展名"文旅小红书带货笔记内容0512"')
return parser.parse_args()
def create_database_connection(db_path):
"""创建到SQLite数据库的连接"""
try:
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row # 使结果以字典形式访问
return conn
except sqlite3.Error as e:
logger.error(f"连接数据库失败: {e}")
return None
def query_contents_from_database(conn, product=None, object=None, judge_only_success=False, undistributed_only=False):
"""从数据库查询内容"""
try:
cursor = conn.cursor()
# 构建查询条件
conditions = []
params = []
if product:
conditions.append("product LIKE ?")
params.append(f"%{product}%")
if object:
conditions.append("object LIKE ?")
params.append(f"%{object}%")
if judge_only_success:
conditions.append("judge_status = 1")
if undistributed_only:
conditions.append("is_distributed = 0")
# 构建SQL查询
sql = "SELECT * FROM contents"
if conditions:
sql += " WHERE " + " AND ".join(conditions)
# 执行查询
cursor.execute(sql, params)
results = cursor.fetchall()
# 转换为DataFrame
content_list = []
for row in results:
content_list.append({
'id': row['id'], # 内部ID用于数据库关联
'EntryID': row['entry_id'],
'OutputTxtPath': row['output_txt_path'],
'PosterPath': row['poster_path'],
'ArticleJsonPath': row['article_json_path'],
'Product': row['product'],
'Object': row['object'],
'Date': row['date'],
'Logic': row['logic'],
'JudgeStatus': row['judge_status'],
'IsDistributed': row['is_distributed']
})
content_df = pd.DataFrame(content_list)
# 输出查询统计信息
logger.info(f"从数据库查询到 {len(content_df)} 条内容")
if product:
logger.info(f"按产品筛选: {product}")
if object:
logger.info(f"按景点筛选: {object}")
if judge_only_success:
logger.info("仅显示审核通过的内容")
if undistributed_only:
logger.info("仅显示未分发的内容")
return content_df
except Exception as e:
logger.error(f"从数据库查询内容失败: {e}")
return None
def query_users_from_database(conn, user_id=None, user_email=None, max_users=None):
"""从数据库查询用户"""
try:
cursor = conn.cursor()
# 构建查询条件
conditions = []
params = []
if user_id:
conditions.append("id = ?")
params.append(user_id)
if user_email:
conditions.append("email LIKE ?")
params.append(f"%{user_email}%")
# 构建SQL查询
sql = "SELECT id, email, username FROM users"
if conditions:
sql += " WHERE " + " AND ".join(conditions)
# 限制用户数量
if max_users:
sql += f" LIMIT {max_users}"
# 执行查询
cursor.execute(sql, params)
results = cursor.fetchall()
# 转换为DataFrame
user_list = []
for row in results:
user_list.append({
'id': row['id'],
'email': row['email'],
'username': row['username'] or row['email'].split('@')[0]
})
user_df = pd.DataFrame(user_list)
# 输出查询统计信息
logger.info(f"从数据库查询到 {len(user_df)} 个用户")
if user_id:
logger.info(f"按用户ID筛选: {user_id}")
if user_email:
logger.info(f"按用户邮箱筛选: {user_email}")
if max_users:
logger.info(f"限制最大用户数: {max_users}")
return user_df
except Exception as e:
logger.error(f"从数据库查询用户失败: {e}")
return None
def allocate_content_to_users(users_df, content_df, article_per_user):
"""为用户分配内容"""
try:
# 创建用户列表
users = []
for _, row in users_df.iterrows():
users.append({
'id': row['id'],
'email': row['email'],
'username': row['username'],
'contents': []
})
# 转换为记录列表
content_list = content_df.to_dict('records')
if not content_list:
logger.warning("没有可用内容进行分配")
return []
# 随机打乱内容列表
random.shuffle(content_list)
# 为每个用户分配内容
content_allocated = []
content_index = 0
for user in users:
user_contents = []
for _ in range(article_per_user):
if content_index >= len(content_list):
content_index = 0 # 如果内容不够,循环使用
logger.warning("内容不足,将循环使用现有内容")
content = content_list[content_index]
user_contents.append(content)
content_allocated.append(content)
content_index += 1
user['contents'] = user_contents
logger.info(f"已为 {len(users)} 个用户分配 {len(content_allocated)} 条内容")
unique_content_count = len(set([c['EntryID'] for c in content_allocated]))
logger.info(f"分配的唯一内容条数: {unique_content_count}")
return users
except Exception as e:
logger.error(f"分配内容失败: {e}")
return []
def find_additional_images(file_path):
"""查找与文章相关的所有额外图片"""
if not file_path or pd.isna(file_path) or not os.path.exists(file_path):
return []
# 获取文章目录
article_dir = os.path.dirname(file_path)
# 查找可能的额外图片目录
additional_images_dir = os.path.join(article_dir, "additional_images")
if not os.path.exists(additional_images_dir):
# 尝试向上一级寻找
parent_dir = os.path.dirname(article_dir)
additional_images_dir = os.path.join(parent_dir, "additional_images")
if not os.path.exists(additional_images_dir):
return []
# 查找所有图片文件
image_patterns = ["*.jpg", "*.jpeg", "*.png", "*.gif", "*.webp"]
image_files = []
for pattern in image_patterns:
image_files.extend(glob.glob(os.path.join(additional_images_dir, pattern)))
logger.info(f"找到 {len(image_files)} 张额外图片: {additional_images_dir}")
return image_files
def prepare_distribution_csv(users_with_content, output_dir):
"""准备分发CSV文件"""
try:
os.makedirs(output_dir, exist_ok=True)
# 准备CSV数据
rows = []
for user in users_with_content:
for content in user['contents']:
# 查找额外图片
output_txt_path = content['OutputTxtPath']
additional_images = find_additional_images(output_txt_path)
rows.append({
'user_id': user['id'],
'email': user['email'],
'username': user['username'],
'content_id': content['id'],
'entry_id': content['EntryID'],
'file_path': content['OutputTxtPath'],
'poster_path': content['PosterPath'],
'article_json_path': content['ArticleJsonPath'],
'additional_images': ';'.join(additional_images), # 保存为分号分隔的字符串
'product': content.get('Product', ''),
'object': content.get('Object', ''),
'date': content.get('Date', ''),
'logic': content.get('Logic', '')
})
# 创建DataFrame并保存
df = pd.DataFrame(rows)
distribution_csv = os.path.join(output_dir, f"distribution_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv")
df.to_csv(distribution_csv, index=False)
logger.info(f"分发CSV已保存到: {distribution_csv}")
return distribution_csv
except Exception as e:
logger.error(f"准备分发CSV失败: {e}")
return None
def create_zip_file(files, output_path):
"""将文件打包为ZIP"""
try:
os.makedirs(os.path.dirname(output_path), exist_ok=True)
with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for file_path in files:
if not file_path or pd.isna(file_path):
continue
if os.path.exists(file_path):
arcname = os.path.basename(file_path)
zipf.write(file_path, arcname=arcname)
else:
logger.warning(f"文件不存在,跳过: {file_path}")
logger.info(f"成功创建ZIP文件: {output_path}")
return output_path
except Exception as e:
logger.error(f"创建ZIP文件失败: {e}")
return None
def send_single_email(from_addr, password, to_addr, subject, content, attachments,
smtp_server, smtp_port, use_ssl=False):
"""发送单封邮件"""
try:
# 创建邮件对象
message = MIMEMultipart()
message['From'] = Header(from_addr)
message['To'] = Header(to_addr)
message['Subject'] = Header(subject)
# 添加正文
message.attach(MIMEText(content, 'plain', 'utf-8'))
# 添加附件
for attachment in attachments:
if not os.path.exists(attachment):
logger.warning(f"附件不存在,跳过: {attachment}")
continue
with open(attachment, 'rb') as f:
part = MIMEApplication(f.read())
part.add_header('Content-Disposition', 'attachment',
filename=Header(os.path.basename(attachment), 'utf-8').encode())
message.attach(part)
# 连接SMTP服务器并发送
if use_ssl:
server = smtplib.SMTP_SSL(smtp_server, smtp_port)
else:
server = smtplib.SMTP(smtp_server, smtp_port)
server.login(from_addr, password)
server.sendmail(from_addr, to_addr, message.as_string())
server.quit()
logger.info(f"成功发送邮件到: {to_addr}")
return True
except Exception as e:
logger.error(f"发送邮件到 {to_addr} 失败: {e}")
return False
def send_emails(distribution_csv, output_dir, email_from, email_password,
subject, smtp_server, smtp_port, use_ssl, test_mode, delay=2, zip_filename=None):
"""发送邮件"""
try:
# 确保输出目录存在
os.makedirs(output_dir, exist_ok=True)
os.makedirs(os.path.join(output_dir, "temp_zips"), exist_ok=True)
# 读取分发CSV
df = pd.read_csv(distribution_csv)
logger.info(f"{distribution_csv} 读取了 {len(df)} 条分发记录")
# 按邮箱分组
email_groups = {}
for _, row in df.iterrows():
email = row['email']
if email not in email_groups:
email_groups[email] = []
email_groups[email].append(row)
logger.info(f"共有 {len(email_groups)} 个邮箱需要发送")
# 结果记录
results = {
"total": len(email_groups),
"success": 0,
"failed": 0,
"details": []
}
# 临时ZIP文件列表
temp_zips = []
# 发送邮件
for email, rows in email_groups.items():
try:
logger.info(f"处理邮箱: {email}")
# 收集所有文件路径
files = []
for row in rows:
# 收集文本文件
file_path = row.get('file_path')
if file_path and not pd.isna(file_path) and os.path.exists(file_path):
files.append(file_path)
# 收集海报图片
poster_path = row.get('poster_path')
if poster_path and not pd.isna(poster_path) and os.path.exists(poster_path):
files.append(poster_path)
# 收集所有额外图片
additional_images = row.get('additional_images', '')
if additional_images and not pd.isna(additional_images):
for img_path in additional_images.split(';'):
if img_path.strip() and os.path.exists(img_path):
files.append(img_path)
if not files:
logger.warning(f"邮箱 {email} 没有有效的附件文件,跳过")
results["details"].append({
"user_id": rows[0]['user_id'],
"email": email,
"status": "skipped",
"reason": "没有有效的附件文件",
"files": []
})
results["failed"] += 1
continue
# 创建ZIP文件
if zip_filename:
# 使用指定的文件名
zip_filename_with_ext = f"{zip_filename}.zip"
else:
# 使用默认的时间戳文件名
zip_filename_with_ext = f"content_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{hash(email) % 10000}.zip"
zip_path = os.path.join(output_dir, "temp_zips", zip_filename_with_ext)
zip_file = create_zip_file(files, zip_path)
if zip_file:
temp_zips.append(zip_file)
else:
logger.error(f"为邮箱 {email} 创建ZIP文件失败跳过")
results["details"].append({
"user_id": rows[0]['user_id'],
"email": email,
"status": "failed",
"reason": "创建ZIP文件失败",
"files": files
})
results["failed"] += 1
continue
# 构建邮件内容
files_count = len(files)
entries_count = len(rows)
email_content = f"""您好!请查收今日带货笔记(文案+配图),内容在文件压缩包内。具体挂载商品等操作流程请查看对应达人微信群内信息。
共包含{entries_count}篇文章内容,请按照要求发布。
"""
# 发送邮件
if test_mode:
logger.info(f"测试模式: 模拟发送邮件到 {email}")
results["details"].append({
"user_id": rows[0]['user_id'],
"email": email,
"status": "success (test mode)",
"files": files
})
results["success"] += 1
else:
success = send_single_email(
email_from,
email_password,
email,
subject,
email_content,
[zip_file],
smtp_server,
smtp_port,
use_ssl
)
if success:
results["success"] += 1
results["details"].append({
"user_id": rows[0]['user_id'],
"email": email,
"status": "success",
"files": files
})
else:
results["failed"] += 1
results["details"].append({
"user_id": rows[0]['user_id'],
"email": email,
"status": "failed",
"reason": "SMTP发送失败",
"files": files
})
# 添加延迟,避免发送过快
if not test_mode and delay > 0:
logger.info(f"等待 {delay} 秒后继续发送下一封邮件...")
time.sleep(delay)
except Exception as e:
logger.error(f"处理邮箱 {email} 时出错: {e}")
results["failed"] += 1
results["details"].append({
"email": email,
"status": "failed",
"reason": str(e),
"files": []
})
# 打印发送结果摘要
logger.info("\n===== 邮件发送结果摘要 =====")
logger.info(f"总计: {results['total']} 个邮箱")
logger.info(f"成功: {results['success']} 个邮箱")
logger.info(f"失败: {results['failed']} 个邮箱")
# 清理临时文件
if not test_mode:
logger.info("清理临时ZIP文件...")
for temp_zip in temp_zips:
try:
if os.path.exists(temp_zip):
os.remove(temp_zip)
logger.info(f"已删除临时文件: {temp_zip}")
except Exception as e:
logger.warning(f"删除临时文件失败: {temp_zip}, 错误: {e}")
# 将结果写入CSV文件
try:
result_df = pd.DataFrame(results["details"])
result_csv = os.path.join(output_dir,
f"email_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv")
result_df.to_csv(result_csv, index=False)
logger.info(f"发送结果已保存到: {result_csv}")
return result_csv
except Exception as e:
logger.error(f"保存结果到CSV失败: {e}")
return None
except Exception as e:
logger.error(f"发送邮件过程中发生错误: {e}")
return None
def record_distribution_to_database(conn, distribution_csv, send_result_csv):
"""记录分发结果到数据库"""
try:
# 读取分发CSV
dist_df = pd.read_csv(distribution_csv)
# 读取发送结果CSV
if send_result_csv and os.path.exists(send_result_csv):
result_df = pd.read_csv(send_result_csv)
# 添加发送状态
email_status = {}
for _, row in result_df.iterrows():
email_status[row['email']] = row['status']
dist_df['send_status'] = dist_df['email'].map(email_status)
else:
dist_df['send_status'] = 'unknown'
# 记录到distributions表
cursor = conn.cursor()
batch_id = f"batch_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
# 添加记录
insertion_count = 0
for _, row in dist_df.iterrows():
try:
user_id = row['user_id']
content_id = row['content_id']
# 检查是否成功发送
success = 'success' in str(row.get('send_status', '')).lower()
# 插入分发记录
cursor.execute("""
INSERT INTO distributions
(content_id, user_id, distribution_date, send_status, batch_id)
VALUES (?, ?, ?, ?, ?)
""", (
content_id,
user_id,
datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
row.get('send_status', 'unknown'),
batch_id
))
insertion_count += 1
# 如果成功发送,更新内容分发状态
if success:
cursor.execute("""
UPDATE contents
SET is_distributed = 1
WHERE id = ?
""", (content_id,))
except Exception as e:
logger.warning(f"记录分发结果失败: {row.get('entry_id')} -> {row.get('email')}, 错误: {e}")
conn.commit()
logger.info(f"已记录 {insertion_count} 条分发记录到数据库批次ID: {batch_id}")
return True
except Exception as e:
logger.error(f"记录分发结果到数据库失败: {e}")
conn.rollback()
return False
def generate_distribution_report(conn, distribution_csv, send_result_csv, output_dir):
"""生成分发报告"""
try:
# 读取分发CSV
dist_df = pd.read_csv(distribution_csv)
# 读取发送结果CSV
if send_result_csv and os.path.exists(send_result_csv):
result_df = pd.read_csv(send_result_csv)
# 添加发送状态
email_status = {}
for _, row in result_df.iterrows():
email_status[row['email']] = row['status']
dist_df['send_status'] = dist_df['email'].map(email_status)
else:
dist_df['send_status'] = 'unknown'
# 统计文章分发情况
article_stats = {}
for _, row in dist_df.iterrows():
entry_id = row['entry_id']
if entry_id not in article_stats:
article_stats[entry_id] = {
'content_id': row['content_id'],
'total_assigned': 0,
'sent_success': 0,
'sent_failed': 0,
'sent_unknown': 0,
'product': row.get('product', ''),
'object': row.get('object', ''),
'date': row.get('date', ''),
'logic': row.get('logic', '')
}
article_stats[entry_id]['total_assigned'] += 1
if 'success' in str(row['send_status']).lower():
article_stats[entry_id]['sent_success'] += 1
elif row['send_status'] in ['failed', 'skipped']:
article_stats[entry_id]['sent_failed'] += 1
else:
article_stats[entry_id]['sent_unknown'] += 1
# 创建简洁的分发摘要报告
summary_data = []
cursor = conn.cursor()
for entry_id, stats in article_stats.items():
# 从数据库获取内容详情
cursor.execute("SELECT * FROM contents WHERE id = ?", (stats['content_id'],))
content = cursor.fetchone()
if content:
summary_data.append({
'entry_id': entry_id,
'content_id': stats['content_id'],
'product': stats['product'] or content['product'],
'object': stats['object'] or content['object'],
'date': stats['date'] or content['date'],
'logic': stats['logic'] or content['logic'],
'judge_status': content['judge_status'],
'is_distributed': content['is_distributed'],
'assigned': stats['total_assigned'],
'success': stats['sent_success'],
'failed': stats['sent_failed'],
'unknown': stats['sent_unknown']
})
else:
summary_data.append({
'entry_id': entry_id,
'content_id': stats['content_id'],
'product': stats['product'],
'object': stats['object'],
'date': stats['date'],
'logic': stats['logic'],
'assigned': stats['total_assigned'],
'success': stats['sent_success'],
'failed': stats['sent_failed'],
'unknown': stats['sent_unknown']
})
if summary_data:
summary_df = pd.DataFrame(summary_data)
summary_csv = os.path.join(output_dir, f"distribution_summary_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv")
summary_df.to_csv(summary_csv, index=False)
logger.info(f"分发摘要报告已保存到: {summary_csv}")
# 保存文章统计
stats_json = os.path.join(output_dir, f"article_stats_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json")
with open(stats_json, 'w') as f:
json.dump(article_stats, f, indent=2)
# 统计摘要
cursor.execute("SELECT COUNT(*) FROM contents")
total_articles = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM contents WHERE is_distributed = 1")
distributed_articles = cursor.fetchone()[0]
success_sent = sum(1 for _, v in article_stats.items() if v['sent_success'] > 0)
logger.info(f"文章统计已保存到: {stats_json}")
logger.info("\n===== 分发统计 =====")
logger.info(f"总文章数: {total_articles}")
logger.info(f"已分配文章数: {distributed_articles} ({distributed_articles/total_articles*100:.2f}%)")
logger.info(f"本次成功发送文章数: {success_sent}")
# 按产品统计
cursor.execute("""
SELECT product,
COUNT(*) as total,
SUM(CASE WHEN is_distributed = 1 THEN 1 ELSE 0 END) as distributed
FROM contents
GROUP BY product
""")
product_stats = cursor.fetchall()
if product_stats:
logger.info("\n===== 按产品分发统计 =====")
for row in product_stats:
product = row['product']
total = row['total']
distributed = row['distributed']
if total > 0:
logger.info(f"产品 '{product}': {distributed}/{total} ({distributed/total*100:.2f}%)")
return stats_json
except Exception as e:
logger.error(f"生成分发报告失败: {e}")
return None
def main():
args = parse_arguments()
# 创建输出目录
os.makedirs(args.output_dir, exist_ok=True)
# 连接数据库
conn = create_database_connection(args.db_path)
if not conn:
logger.error("无法连接到数据库,程序退出")
return
try:
# 从数据库查询内容
content_df = query_contents_from_database(
conn,
product=args.product,
object=args.object,
judge_only_success=args.judge_only_success,
undistributed_only=args.undistributed_only
)
if content_df is None or len(content_df) == 0:
logger.error("没有找到符合条件的内容,程序退出")
return
# 从数据库查询用户
users_df = query_users_from_database(
conn,
user_id=args.user_id,
user_email=args.user_email,
max_users=args.max_users
)
if users_df is None or len(users_df) == 0:
logger.error("没有找到符合条件的用户,程序退出")
return
# 为用户分配内容
users_with_content = allocate_content_to_users(
users_df, content_df, args.article_per_user
)
if not users_with_content:
logger.error("内容分配失败,程序退出")
return
# 准备分发CSV
distribution_csv = prepare_distribution_csv(users_with_content, args.output_dir)
if not distribution_csv:
logger.error("准备分发CSV失败程序退出")
return
# 发送邮件
send_result_csv = send_emails(
distribution_csv, args.output_dir, args.email_from, args.email_password,
args.subject, args.smtp_server, args.smtp_port, args.use_ssl, args.test_mode,
args.delay, args.zip_filename
)
# 记录分发结果到数据库
if not args.test_mode:
record_distribution_to_database(conn, distribution_csv, send_result_csv)
else:
logger.info("测试模式,不记录分发结果到数据库")
# 生成分发报告
generate_distribution_report(conn, distribution_csv, send_result_csv, args.output_dir)
logger.info("内容分发流程完成")
finally:
conn.close()
if __name__ == "__main__":
main()