调整了分发数量不足时的分发方式

This commit is contained in:
jinye_huang 2025-05-16 17:19:04 +08:00
parent 5f1a75f017
commit dc981053fe
5 changed files with 555 additions and 45 deletions

258
scripts/distribution/distribute_content.py Normal file → Executable file
View File

@ -83,6 +83,9 @@ def parse_arguments():
help='每封邮件发送之间的延迟时间(秒)')
parser.add_argument('--zip-filename', type=str, default=None,
help='指定ZIP压缩包的基本文件名不含扩展名"文旅小红书带货笔记内容0512"')
# 新增强制性附件参数
parser.add_argument('--force-attachments', type=str, default=None,
help='强制添加的附件文件路径,多个文件用逗号分隔')
return parser.parse_args()
@ -225,6 +228,8 @@ def allocate_content_to_users(users_df, content_df, article_per_user):
try:
# 创建用户列表
users = []
users_with_insufficient_content = [] # 新增:跟踪内容不足的用户
for _, row in users_df.iterrows():
users.append({
'id': row['id'],
@ -237,7 +242,7 @@ def allocate_content_to_users(users_df, content_df, article_per_user):
content_list = content_df.to_dict('records')
if not content_list:
logger.warning("没有可用内容进行分配")
return []
return [], []
# 随机打乱内容列表
random.shuffle(content_list)
@ -248,10 +253,22 @@ def allocate_content_to_users(users_df, content_df, article_per_user):
for user in users:
user_contents = []
expected_count = article_per_user # 预期内容数量
for _ in range(article_per_user):
if content_index >= len(content_list):
content_index = 0 # 如果内容不够,循环使用
logger.warning("内容不足,将循环使用现有内容")
# 不再循环使用内容,而是停止分配
logger.warning("内容不足,只分配可用内容")
# 如果用户获得的内容少于预期,记录下来
if len(user_contents) < expected_count:
users_with_insufficient_content.append({
'user_id': user['id'],
'email': user['email'],
'username': user['username'],
'expected_content': expected_count,
'actual_content': len(user_contents)
})
break
content = content_list[content_index]
user_contents.append(content)
@ -260,14 +277,28 @@ def allocate_content_to_users(users_df, content_df, article_per_user):
user['contents'] = user_contents
# 检查是否获得了预期数量的内容
if len(user_contents) < expected_count:
users_with_insufficient_content.append({
'user_id': user['id'],
'email': user['email'],
'username': user['username'],
'expected_content': expected_count,
'actual_content': len(user_contents)
})
logger.info(f"已为 {len(users)} 个用户分配 {len(content_allocated)} 条内容")
unique_content_count = len(set([c['EntryID'] for c in content_allocated]))
logger.info(f"分配的唯一内容条数: {unique_content_count}")
return users
if users_with_insufficient_content:
logger.warning(f"{len(users_with_insufficient_content)} 个用户获得的内容少于预期数量")
# 返回用户列表和内容不足的用户列表
return users, users_with_insufficient_content
except Exception as e:
logger.error(f"分配内容失败: {e}")
return []
return [], []
def find_additional_images(file_path):
"""查找与文章相关的所有额外图片"""
@ -296,7 +327,7 @@ def find_additional_images(file_path):
logger.info(f"找到 {len(image_files)} 张额外图片: {additional_images_dir}")
return image_files
def prepare_distribution_csv(users_with_content, output_dir):
def prepare_distribution_csv(users_with_content, output_dir, users_with_insufficient_content=None):
"""准备分发CSV文件"""
try:
os.makedirs(output_dir, exist_ok=True)
@ -331,6 +362,14 @@ def prepare_distribution_csv(users_with_content, output_dir):
df.to_csv(distribution_csv, index=False)
logger.info(f"分发CSV已保存到: {distribution_csv}")
# 如果有内容不足的用户导出到CSV
if users_with_insufficient_content:
insufficient_df = pd.DataFrame(users_with_insufficient_content)
insufficient_csv = os.path.join(output_dir, f"insufficient_content_users_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv")
insufficient_df.to_csv(insufficient_csv, index=False)
logger.info(f"内容不足的用户列表已保存到: {insufficient_csv}")
return distribution_csv
except Exception as e:
logger.error(f"准备分发CSV失败: {e}")
@ -399,7 +438,7 @@ def send_single_email(from_addr, password, to_addr, subject, content, attachment
return False
def send_emails(distribution_csv, output_dir, email_from, email_password,
subject, smtp_server, smtp_port, use_ssl, test_mode, delay=2, zip_filename=None, test_email=None):
subject, smtp_server, smtp_port, use_ssl, test_mode, delay=2, zip_filename=None, test_email=None, force_attachments=None):
"""发送邮件"""
try:
# 确保输出目录存在
@ -413,36 +452,63 @@ def send_emails(distribution_csv, output_dir, email_from, email_password,
if test_email:
logger.info(f"测试邮箱模式已启用,所有邮件将发送到: {test_email}")
# 按邮箱分组
email_groups = {}
for _, row in df.iterrows():
email = row['email']
if email not in email_groups:
email_groups[email] = []
email_groups[email].append(row)
# 处理强制性附件
forced_attachments = []
if force_attachments:
for attachment_path in force_attachments.split(','):
attachment_path = attachment_path.strip()
if os.path.exists(attachment_path):
if os.path.isdir(attachment_path):
# 如果是目录,添加目录下所有文件
for root, _, files in os.walk(attachment_path):
for file in files:
file_path = os.path.join(root, file)
forced_attachments.append(file_path)
logger.info(f"添加强制性附件目录文件: {file_path}")
else:
# 如果是文件,直接添加
forced_attachments.append(attachment_path)
logger.info(f"添加强制性附件: {attachment_path}")
else:
logger.warning(f"强制性附件不存在,跳过: {attachment_path}")
logger.info(f"共有 {len(email_groups)} 个邮箱需要发送")
logger.info(f"共添加 {len(forced_attachments)} 个强制性附件")
# 按用户ID分组而不是按邮箱分组确保即使邮箱相同也会发送多封邮件
user_groups = {}
for _, row in df.iterrows():
user_id = row['user_id']
if user_id not in user_groups:
user_groups[user_id] = []
user_groups[user_id].append(row)
logger.info(f"共有 {len(user_groups)} 个用户需要发送")
# 结果记录
results = {
"total": len(email_groups),
"total": len(user_groups),
"success": 0,
"failed": 0,
"details": []
}
# 发送邮件
for email, rows in email_groups.items():
for user_id, rows in user_groups.items():
try:
logger.info(f"处理邮箱: {email}")
# 获取用户邮箱
email = rows[0]['email']
logger.info(f"处理用户: {user_id}, 邮箱: {email}")
# 收集所有文件路径
files = []
article_contents = []
valid_article_count = 0
for row in rows:
# 收集文本文件并读取内容
file_path = row.get('file_path')
has_article_content = False
if file_path and not pd.isna(file_path) and os.path.exists(file_path):
files.append(file_path)
@ -450,9 +516,10 @@ def send_emails(distribution_csv, output_dir, email_from, email_password,
try:
with open(file_path, 'r', encoding='utf-8') as f:
article_content = f.read()
# product_name = row.get('product', '')
# object_name = row.get('object', '')
article_contents.append(f"\n\n{article_content}\n\n{'='*50}\n\n")
if article_content.strip(): # 确保内容不为空
article_contents.append(f"\n\n{article_content}\n\n{'='*50}\n\n")
has_article_content = True
valid_article_count += 1
except Exception as e:
logger.warning(f"读取文章内容失败: {file_path}, 错误: {e}")
@ -468,13 +535,77 @@ def send_emails(distribution_csv, output_dir, email_from, email_password,
if img_path.strip() and os.path.exists(img_path):
files.append(img_path)
if not files:
logger.warning(f"邮箱 {email} 没有有效的附件文件,跳过")
# 添加强制性附件
if forced_attachments:
files.extend(forced_attachments)
# 检查是否有有效的文件和文章内容
if not files or valid_article_count == 0:
logger.warning(f"用户 {user_id} (邮箱: {email}) 没有有效的文件或文章内容,尝试查找替代内容")
# 尝试查找替代内容
try:
# 连接数据库
db_path = '/root/autodl-tmp/TravelContentCreator/distribution.db'
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
# 查询一篇审核通过但未分发的内容
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM contents
WHERE judge_status = 1 AND is_distributed = 0
LIMIT 1
""")
alt_content = cursor.fetchone()
if alt_content:
logger.info(f"找到替代内容: {alt_content['entry_id']}")
# 清空之前的内容
files = []
article_contents = []
# 添加替代内容
alt_file_path = alt_content['output_txt_path']
if alt_file_path and os.path.exists(alt_file_path):
files.append(alt_file_path)
# 读取文章内容
try:
with open(alt_file_path, 'r', encoding='utf-8') as f:
alt_article_content = f.read()
article_contents.append(f"\n\n{alt_article_content}\n\n{'='*50}\n\n")
valid_article_count = 1
except Exception as e:
logger.warning(f"读取替代内容失败: {alt_file_path}, 错误: {e}")
# 添加海报和额外图片
alt_poster_path = alt_content['poster_path']
if alt_poster_path and os.path.exists(alt_poster_path):
files.append(alt_poster_path)
# 寻找额外图片
alt_additional_images = find_additional_images(alt_file_path)
for img_path in alt_additional_images:
if os.path.exists(img_path):
files.append(img_path)
else:
logger.warning("未找到替代内容")
conn.close()
except Exception as e:
logger.error(f"查找替代内容时出错: {e}")
# 再次检查是否有有效的文件和内容
if not files or valid_article_count == 0:
logger.warning(f"用户 {user_id} (邮箱: {email}) 没有有效的附件文件或文章内容,跳过")
results["details"].append({
"user_id": rows[0]['user_id'],
"user_id": user_id,
"email": email,
"status": "skipped",
"reason": "没有有效的附件文件",
"reason": "没有有效的附件文件或文章内容",
"files": []
})
results["failed"] += 1
@ -482,14 +613,14 @@ def send_emails(distribution_csv, output_dir, email_from, email_password,
# 构建邮件内容
files_count = len(files)
entries_count = len(rows)
entries_count = valid_article_count
# 基本邮件内容
base_email_content = f"""
您好请查收今日带货笔记文案+配图内容在邮件正文和附件中具体挂载商品等操作流程请查看对应达人微信群内信息
优化建议
所有提供的材料仅为参考支持自行修改
所有提供的材料仅为参考支持自行修改您可以参考我们整理给您的<小红书爆款标题公式文件>针对标题进行修改仿写
标题优化和格式的规整更有利于带来小眼睛
本测试提供的商品均为近期高佣金筛选后的优质商单质量保证
@ -502,9 +633,9 @@ def send_emails(distribution_csv, output_dir, email_from, email_password,
# 发送邮件
if test_mode:
logger.info(f"测试模式: 模拟发送邮件到 {email}")
logger.info(f"测试模式: 模拟发送邮件到用户 {user_id} (邮箱: {email})")
results["details"].append({
"user_id": rows[0]['user_id'],
"user_id": user_id,
"email": email,
"status": "success (test mode)",
"files": files
@ -532,7 +663,7 @@ def send_emails(distribution_csv, output_dir, email_from, email_password,
if success:
results["success"] += 1
results["details"].append({
"user_id": rows[0]['user_id'],
"user_id": user_id,
"email": email,
"status": "success",
"files": files
@ -540,7 +671,7 @@ def send_emails(distribution_csv, output_dir, email_from, email_password,
else:
results["failed"] += 1
results["details"].append({
"user_id": rows[0]['user_id'],
"user_id": user_id,
"email": email,
"status": "failed",
"reason": "SMTP发送失败",
@ -553,9 +684,10 @@ def send_emails(distribution_csv, output_dir, email_from, email_password,
time.sleep(delay)
except Exception as e:
logger.error(f"处理邮箱 {email} 时出错: {e}")
logger.error(f"处理用户 {user_id} (邮箱: {email}) 时出错: {e}")
results["failed"] += 1
results["details"].append({
"user_id": user_id,
"email": email,
"status": "failed",
"reason": str(e),
@ -564,9 +696,9 @@ def send_emails(distribution_csv, output_dir, email_from, email_password,
# 打印发送结果摘要
logger.info("\n===== 邮件发送结果摘要 =====")
logger.info(f"总计: {results['total']}邮箱")
logger.info(f"成功: {results['success']}邮箱")
logger.info(f"失败: {results['failed']}邮箱")
logger.info(f"总计: {results['total']}用户")
logger.info(f"成功: {results['success']}用户")
logger.info(f"失败: {results['failed']}用户")
# 将结果写入CSV文件
try:
@ -651,7 +783,7 @@ def record_distribution_to_database(conn, distribution_csv, send_result_csv):
conn.rollback()
return False
def generate_distribution_report(conn, distribution_csv, send_result_csv, output_dir):
def generate_distribution_report(conn, distribution_csv, send_result_csv, output_dir, insufficient_content_users=None):
"""生成分发报告"""
try:
# 读取分发CSV
@ -744,6 +876,48 @@ def generate_distribution_report(conn, distribution_csv, send_result_csv, output
with open(stats_json, 'w') as f:
json.dump(article_stats, f, indent=2)
# 如果有内容不足的用户,重新导出一份详细报告
if insufficient_content_users:
detailed_insufficient_users = []
# 获取收件用户的实际发送情况
user_send_stats = {}
for _, row in dist_df.iterrows():
user_id = row['user_id']
if user_id not in user_send_stats:
user_send_stats[user_id] = {
'email': row['email'],
'username': row['username'],
'sent_count': 0,
'success_count': 0
}
user_send_stats[user_id]['sent_count'] += 1
if 'success' in str(row.get('send_status', '')).lower():
user_send_stats[user_id]['success_count'] += 1
# 合并内容不足用户的实际发送情况
for user in insufficient_content_users:
user_id = user['user_id']
send_info = user_send_stats.get(user_id, {'sent_count': 0, 'success_count': 0})
detailed_insufficient_users.append({
'user_id': user['user_id'],
'email': user['email'],
'username': user['username'],
'expected_content': user['expected_content'],
'allocated_content': user['actual_content'],
'sent_count': send_info['sent_count'],
'success_count': send_info['success_count']
})
# 导出详细的内容不足用户报告
if detailed_insufficient_users:
detailed_df = pd.DataFrame(detailed_insufficient_users)
detailed_csv = os.path.join(output_dir, f"insufficient_content_users_detailed_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv")
detailed_df.to_csv(detailed_csv, index=False)
logger.info(f"内容不足用户的详细报告已保存到: {detailed_csv}")
# 统计摘要
cursor.execute("SELECT COUNT(*) FROM contents")
total_articles = cursor.fetchone()[0]
@ -759,6 +933,10 @@ def generate_distribution_report(conn, distribution_csv, send_result_csv, output
logger.info(f"已分配文章数: {distributed_articles} ({distributed_articles/total_articles*100:.2f}%)")
logger.info(f"本次成功发送文章数: {success_sent}")
# 内容不足的用户统计
if insufficient_content_users:
logger.info(f"内容不足的用户数: {len(insufficient_content_users)}")
# 按产品统计
cursor.execute("""
SELECT product,
@ -822,7 +1000,7 @@ def main():
return
# 为用户分配内容
users_with_content = allocate_content_to_users(
users_with_content, insufficient_content_users = allocate_content_to_users(
users_df, content_df, args.article_per_user
)
@ -831,7 +1009,7 @@ def main():
return
# 准备分发CSV
distribution_csv = prepare_distribution_csv(users_with_content, args.output_dir)
distribution_csv = prepare_distribution_csv(users_with_content, args.output_dir, insufficient_content_users)
if not distribution_csv:
logger.error("准备分发CSV失败程序退出")
return
@ -840,7 +1018,7 @@ def main():
send_result_csv = send_emails(
distribution_csv, args.output_dir, args.email_from, args.email_password,
args.subject, args.smtp_server, args.smtp_port, args.use_ssl, args.test_mode,
args.delay, args.zip_filename, args.test_email
args.delay, args.zip_filename, args.test_email, args.force_attachments
)
# 记录分发结果到数据库
@ -853,7 +1031,7 @@ def main():
logger.info("测试邮箱模式,不记录分发结果到数据库")
# 生成分发报告
generate_distribution_report(conn, distribution_csv, send_result_csv, args.output_dir)
generate_distribution_report(conn, distribution_csv, send_result_csv, args.output_dir, insufficient_content_users)
logger.info("内容分发流程完成")

View File

@ -14,8 +14,10 @@ DB_PATH="$BASE_DIR/distribution.db"
# 设置邮件相关变量
EMAIL_FROM="zwysendemail@163.com"
EMAIL_PASSWORD="NMhVGFmCJkGEy3B5"
SUBJECT="文旅小红书带货笔记内容0515"
ZIP_FILENAME="文旅小红书带货笔记内容0515"
# EMAIL_FROM="zowoyomedia@163.com"
# EMAIL_PASSWORD="SDj5fK6Tk9YevmsD"
SUBJECT="文旅小红书带货笔记内容0516"
ZIP_FILENAME="文旅小红书带货笔记内容0516"
# 设置分发配置
ARTICLE_PER_USER=1
@ -31,12 +33,15 @@ UNDISTRIBUTED_ONLY=true # 只分发未分发的内容
# 内容筛选配置
TARGET_PRODUCT="" # 为空则不筛选特定产品
TARGET_OBJECT="极爽冲浪馆" # 为空则不筛选特定景点
TARGET_OBJECT="深圳青青世界酒店" # 为空则不筛选特定景点
# 用户筛选配置
TARGET_USER_ID="" # 为空则不筛选特定用户ID
TARGET_USER_EMAIL="" # 为空则不筛选特定用户邮箱
# 强制性附件配置
FORCE_ATTACHMENTS="/root/autodl-tmp/TravelContentCreator/hotel_img/标题参考格式-精选.txt"
# 创建必要的目录
mkdir -p "$LOG_DIR"
mkdir -p "$OUTPUT_DIR"
@ -85,6 +90,20 @@ if [ "$TEST_EMAIL_MODE" = true ]; then
fi
fi
# 检查强制性附件配置
if [ -n "$FORCE_ATTACHMENTS" ]; then
echo "已配置强制性附件: $FORCE_ATTACHMENTS"
# 验证文件是否存在
IFS=',' read -ra ATTACHMENT_ARRAY <<< "$FORCE_ATTACHMENTS"
for attachment in "${ATTACHMENT_ARRAY[@]}"; do
if [ ! -e "$attachment" ]; then
echo "警告: 强制性附件文件或目录不存在: $attachment"
else
echo "已确认强制性附件存在: $attachment"
fi
done
fi
# 数据库状态统计
echo "数据库状态统计:"
sqlite3 "$DB_PATH" <<EOF
@ -161,6 +180,12 @@ if [ "$TEST_EMAIL_MODE" = true ]; then
echo "测试邮箱模式: 所有邮件将发送到 $TEST_EMAIL"
fi
# 强制性附件设置
if [ -n "$FORCE_ATTACHMENTS" ]; then
CMD_ARGS="$CMD_ARGS --force-attachments \"$FORCE_ATTACHMENTS\""
echo "已添加强制性附件到命令参数中"
fi
# 设置每用户文章数量
CMD_ARGS="$CMD_ARGS --article-per-user $ARTICLE_PER_USER"

View File

@ -535,8 +535,8 @@ def main():
args = parser.parse_args()
# 默认值设置
source = args.source if args.source else "/root/autodl-tmp/TravelContentCreator/result/2025-05-14_22-10-37"
output = args.output if args.output else "/root/autodl-tmp/TravelContentCreator/output/2025-05-14_22-10-37"
source = args.source if args.source else "/root/autodl-tmp/TravelContentCreator/result/2025-05-15_20-01-48"
output = args.output if args.output else "/root/autodl-tmp/TravelContentCreator/output/2025-05-15_20-01-48"
run_id = args.run_id if args.run_id else os.path.basename(source)
prefer_original = args.prefer_original
db_path = args.db_path if args.db_path else '/root/autodl-tmp/TravelContentCreator/distribution.db'

View File

@ -0,0 +1,106 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sqlite3
import sys
# 数据库路径
db_path = '/root/autodl-tmp/TravelContentCreator/distribution.db'
def main():
print(f"开始修复数据库: {db_path}")
# 连接数据库
try:
conn = sqlite3.connect(db_path)
conn.execute("PRAGMA foreign_keys = OFF") # 禁用外键约束
cursor = conn.cursor()
print(f"已连接到数据库")
# 备份用户数据
print("正在备份用户数据...")
users_data = []
try:
cursor.execute("SELECT id, email, username, created_at FROM users")
users_data = cursor.fetchall()
print(f"成功备份 {len(users_data)} 条用户记录")
except sqlite3.Error as e:
print(f"备份用户数据时出错: {e}")
print("继续执行修复操作...")
# 检查数据库表结构
print("检查数据库表结构...")
try:
cursor.execute("PRAGMA table_info(users)")
columns = {col[1] for col in cursor.fetchall()}
# 打印当前表结构
print(f"当前用户表字段: {', '.join(columns)}")
# 创建新的用户表
print("创建新的用户表结构...")
cursor.execute("""
CREATE TABLE IF NOT EXISTS users_new (
id INTEGER PRIMARY KEY AUTOINCREMENT,
email TEXT NOT NULL,
username TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
xhs_user_id TEXT UNIQUE
)
""")
# 复制用户数据到新表
if users_data:
print("将数据迁移到新表...")
for user in users_data:
user_id, email, username, created_at = user
# 使用email作为临时xhs_user_id稍后可以更新
cursor.execute("""
INSERT INTO users_new (id, email, username, created_at, xhs_user_id)
VALUES (?, ?, ?, ?, NULL)
""", (user_id, email, username, created_at))
print(f"已将 {len(users_data)} 条记录迁移到新表")
# 查询是否存在users表如果存在就删除
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='users'")
if cursor.fetchone():
cursor.execute("DROP TABLE users")
print("已删除旧的users表")
# 重命名新表
cursor.execute("ALTER TABLE users_new RENAME TO users")
print("已将新表重命名为users")
# 为邮箱创建非唯一索引
cursor.execute("CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)")
print("已为email字段创建索引")
# 为xhs_user_id创建唯一索引
cursor.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_users_xhs_id ON users(xhs_user_id) WHERE xhs_user_id IS NOT NULL")
print("已为xhs_user_id字段创建唯一索引")
# 提交事务
conn.commit()
print("数据库结构修复完成!")
except sqlite3.Error as e:
print(f"修复表结构时出错: {e}")
conn.rollback()
return False
except sqlite3.Error as e:
print(f"数据库连接错误: {e}")
return False
finally:
print("关闭数据库连接")
conn.close()
return True
if __name__ == "__main__":
if main():
print("数据库修复成功!")
else:
print("数据库修复失败!")
sys.exit(1)

View File

@ -0,0 +1,201 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sqlite3
import csv
import sys
# CSV文件路径
csv_file = '/root/autodl-tmp/TravelContentCreator/output/5.15.csv'
# 数据库路径
db_path = '/root/autodl-tmp/TravelContentCreator/distribution.db'
def main():
# 检查CSV文件是否存在
if not os.path.exists(csv_file):
print(f"错误: CSV文件不存在: {csv_file}")
return False
# 连接数据库
try:
conn = sqlite3.connect(db_path)
conn.execute("PRAGMA foreign_keys = OFF") # 禁用外键约束,避免可能的导入问题
cursor = conn.cursor()
print(f"已连接到数据库: {db_path}")
# 检查数据库表结构确保xhs_user_id字段存在
print("检查数据库表结构...")
cursor.execute("PRAGMA table_info(users)")
columns = {col[1] for col in cursor.fetchall()}
if 'xhs_user_id' not in columns:
print("错误: 数据库users表中不存在xhs_user_id字段")
print("请先运行 fix_database.py 脚本修复数据库结构")
return False
print(f"当前用户表字段: {', '.join(columns)}")
# 清除所有历史用户数据
clear_users = input("是否清除所有历史用户数据? (y/n): ").strip().lower()
if clear_users == 'y':
try:
# 先检查是否有关联的分发记录
cursor.execute("SELECT COUNT(*) FROM distributions")
distribution_count = cursor.fetchone()[0]
if distribution_count > 0:
confirm_delete = input(f"警告: 数据库中有 {distribution_count} 条分发记录与用户关联,确定要删除所有用户吗? (y/n): ").strip().lower()
if confirm_delete != 'y':
print("取消清除用户数据")
return False
# 清除所有用户数据
cursor.execute("DELETE FROM users")
conn.commit()
print(f"已清除所有历史用户数据")
except sqlite3.Error as e:
print(f"清除用户数据时出错: {e}")
conn.rollback()
return False
# 检查数据库中已存在的小红书User码
cursor.execute("SELECT xhs_user_id FROM users WHERE xhs_user_id IS NOT NULL")
existing_user_ids = set(row[0] for row in cursor.fetchall() if row[0])
print(f"数据库中已有 {len(existing_user_ids)} 个带有小红书User码的用户")
except sqlite3.Error as e:
print(f"数据库连接错误: {e}")
return False
# 计数器
success_count = 0
skip_count = 0
error_count = 0
update_count = 0
# 记录失败和跳过的条目
failed_entries = []
skipped_entries = []
# 读取CSV文件并导入数据
try:
with open(csv_file, 'r', encoding='utf-8') as f:
reader = csv.reader(f)
# 跳过标题行
headers = next(reader)
print(f"CSV文件标题: {headers}")
for row_num, row in enumerate(reader, 2): # 从2开始计数因为第1行是标题
if len(row) >= 4: # 确保行至少有4列
try:
# 从CSV提取数据 (小红书User码, 小红书ID, 粉丝数, 邮箱)
xhs_user_id, username, fans, email = row
# 如果邮箱为空,跳过
if not email:
skip_info = f"邮箱为空"
skipped_entries.append((row_num, row, skip_info))
print(f"警告: 跳过无邮箱用户: {username} (行 {row_num})")
skip_count += 1
continue
# 确保邮箱格式正确
if '@' not in email:
error_info = f"无效邮箱格式"
failed_entries.append((row_num, row, error_info))
print(f"警告: 跳过无效邮箱: {email} (行 {row_num})")
error_count += 1
continue
# 检查小红书User码是否有效
if not xhs_user_id:
error_info = f"小红书User码为空"
failed_entries.append((row_num, row, error_info))
print(f"警告: 小红书User码为空: {email} (行 {row_num})")
error_count += 1
continue
# 检查小红书User码是否已存在如果存在则更新信息
if xhs_user_id in existing_user_ids:
print(f"用户User码已存在将更新: {xhs_user_id} -> {username}, {email}")
cursor.execute("""
UPDATE users
SET username = ?, email = ?
WHERE xhs_user_id = ?
""", (username, email, xhs_user_id))
update_count += 1
skipped_entries.append((row_num, row, "用户User码已存在已更新信息"))
else:
# 插入新用户
cursor.execute("""
INSERT INTO users (email, username, xhs_user_id)
VALUES (?, ?, ?)
""", (email, username, xhs_user_id))
existing_user_ids.add(xhs_user_id) # 添加到已存在列表
success_count += 1
print(f"插入新用户: {xhs_user_id} -> {username}, {email}")
except sqlite3.IntegrityError as e:
# 处理唯一约束冲突
if "UNIQUE constraint failed: users.xhs_user_id" in str(e):
error_info = f"小红书User码重复"
failed_entries.append((row_num, row, error_info))
print(f"错误: 小红书User码重复: {xhs_user_id} (行 {row_num})")
else:
error_info = f"数据库完整性错误: {e}"
failed_entries.append((row_num, row, error_info))
print(f"插入行时出错 (行 {row_num}): {row}, 错误: {e}")
error_count += 1
except Exception as e:
error_info = f"数据库错误: {e}"
failed_entries.append((row_num, row, error_info))
print(f"插入行时出错 (行 {row_num}): {row}, 错误: {e}")
error_count += 1
else:
error_info = f"列数不足需要至少4列"
failed_entries.append((row_num, row, error_info))
print(f"警告: 跳过格式不正确的行 {row_num}: {row}")
error_count += 1
# 提交事务
conn.commit()
print(f"\n导入完成! 成功插入: {success_count}, 更新: {update_count}, 跳过: {skip_count}, 失败: {error_count}")
# 如果有失败的条目,输出详细信息
if failed_entries:
print("\n===== 导入失败的记录 =====")
for row_num, row, error in failed_entries:
print(f"{row_num}: {row} - 失败原因: {error}")
# 将失败记录保存到文件
failure_file = os.path.join(os.path.dirname(csv_file), "import_failures_5.15.csv")
with open(failure_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow(['行号', '原始数据', '失败原因'])
for row_num, row, error in failed_entries:
writer.writerow([row_num, ','.join(row), error])
print(f"\n失败记录已保存到: {failure_file}")
# 如果有跳过的条目,输出详细信息
if skipped_entries:
print("\n===== 跳过的记录 =====")
for row_num, row, reason in skipped_entries:
print(f"{row_num}: {row} - 跳过原因: {reason}")
except Exception as e:
print(f"读取CSV文件时出错: {e}")
conn.rollback()
return False
finally:
conn.close()
return True
if __name__ == "__main__":
if main():
print("用户数据导入成功!")
else:
print("用户数据导入失败!")
sys.exit(1)