TravelContentCreator/scripts/distribution_database.py

892 lines
34 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import sqlite3
import pandas as pd
import argparse
import logging
from datetime import datetime
import json
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# 数据库表结构定义
DB_SCHEMA = {
"contents": """
CREATE TABLE IF NOT EXISTS contents (
id INTEGER PRIMARY KEY AUTOINCREMENT,
entry_id TEXT NOT NULL UNIQUE,
output_txt_path TEXT,
poster_path TEXT,
article_json_path TEXT,
product TEXT,
object TEXT,
date TEXT,
logic TEXT,
judge_status INTEGER,
is_distributed INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""",
"users": """
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
email TEXT NOT NULL UNIQUE,
username TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""",
"distributions": """
CREATE TABLE IF NOT EXISTS distributions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
content_id INTEGER,
user_id INTEGER,
distribution_date TIMESTAMP,
send_status TEXT,
batch_id TEXT,
FOREIGN KEY (content_id) REFERENCES contents(id),
FOREIGN KEY (user_id) REFERENCES users(id)
)
"""
}
def parse_arguments():
"""解析命令行参数"""
parser = argparse.ArgumentParser(description='内容分发数据库管理')
# 数据库路径
parser.add_argument('--db-path', type=str, default='/root/autodl-tmp/TravelContentCreator/distribution.db',
help='数据库文件路径')
# 子命令解析
subparsers = parser.add_subparsers(dest='command', help='子命令')
# 初始化数据库
init_parser = subparsers.add_parser('init', help='初始化数据库')
# 导入内容
import_contents_parser = subparsers.add_parser('import-contents', help='导入内容清单')
import_contents_parser.add_argument('--file', type=str, required=True,
help='内容清单CSV文件路径')
# 导入用户
import_users_parser = subparsers.add_parser('import-users', help='导入用户数据')
import_users_parser.add_argument('--file', type=str, required=True,
help='用户CSV文件路径')
import_users_parser.add_argument('--email-column', type=str, default='达人邮箱',
help='邮箱列名')
import_users_parser.add_argument('--username-column', type=str, default='小红书ID',
help='用户名列名')
# 导入分发记录
import_dist_parser = subparsers.add_parser('import-distribution', help='导入分发记录')
import_dist_parser.add_argument('--file', type=str, required=True,
help='分发记录CSV文件路径')
import_dist_parser.add_argument('--batch-id', type=str, default=None,
help='分发批次ID默认使用文件名')
# 导入分发结果(从分发脚本)
import_dist_result_parser = subparsers.add_parser('import-distribution-result', help='导入分发结果')
import_dist_result_parser.add_argument('--distribution-csv', type=str, required=True,
help='分发记录CSV文件路径')
import_dist_result_parser.add_argument('--result-csv', type=str, required=True,
help='发送结果CSV文件路径')
import_dist_result_parser.add_argument('--manifest-csv', type=str, required=True,
help='内容清单CSV文件路径')
import_dist_result_parser.add_argument('--batch-id', type=str, default=None,
help='分发批次ID默认使用时间戳')
# 查询内容状态
query_content_parser = subparsers.add_parser('query-content', help='查询内容分发状态')
query_content_parser.add_argument('--entry-id', type=str, default=None,
help='内容ID')
query_content_parser.add_argument('--product', type=str, default=None,
help='产品名称')
query_content_parser.add_argument('--object', type=str, default=None,
help='景点名称')
query_content_parser.add_argument('--output', type=str, default=None,
help='输出CSV文件路径')
# 查询用户接收内容
query_user_parser = subparsers.add_parser('query-user', help='查询用户接收内容')
query_user_parser.add_argument('--email', type=str, required=True,
help='用户邮箱')
query_user_parser.add_argument('--output', type=str, default=None,
help='输出CSV文件路径')
# 生成统计报告
report_parser = subparsers.add_parser('report', help='生成统计报告')
report_parser.add_argument('--output', type=str, default=None,
help='输出CSV文件路径')
report_parser.add_argument('--format', type=str, choices=['csv', 'json'], default='csv',
help='输出格式 (csv 或 json)')
return parser.parse_args()
def create_connection(db_path):
"""创建数据库连接"""
try:
conn = sqlite3.connect(db_path)
# 启用外键约束
conn.execute("PRAGMA foreign_keys = ON")
# 设置返回结果为字典格式
conn.row_factory = sqlite3.Row
return conn
except sqlite3.Error as e:
logger.error(f"创建数据库连接失败: {e}")
return None
def init_database(conn):
"""初始化数据库,创建表结构"""
try:
cursor = conn.cursor()
# 创建表
for table_name, create_sql in DB_SCHEMA.items():
cursor.execute(create_sql)
logger.info(f"创建或验证表 '{table_name}' 成功")
# 创建索引
cursor.execute("CREATE INDEX IF NOT EXISTS idx_contents_entry_id ON contents(entry_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_dist_content_user ON distributions(content_id, user_id)")
conn.commit()
logger.info("数据库初始化成功")
return True
except sqlite3.Error as e:
logger.error(f"初始化数据库失败: {e}")
return False
def import_contents(conn, file_path):
"""导入内容清单到数据库"""
try:
# 读取CSV文件
df = pd.read_csv(file_path)
required_columns = ['EntryID']
# 检查必要列
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
logger.error(f"内容清单CSV中缺少必要列: {', '.join(missing_columns)}")
return False
# 记录导入前数据量
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM contents")
before_count = cursor.fetchone()[0]
# 导入数据
success_count = 0
for _, row in df.iterrows():
try:
# 准备数据
data = {
'entry_id': row['EntryID'],
'output_txt_path': row.get('OutputTxtPath', ''),
'poster_path': row.get('PosterPath', ''),
'article_json_path': row.get('ArticleJsonPath', ''),
'product': row.get('Product', ''),
'object': row.get('Object', ''),
'date': row.get('Date', ''),
'logic': row.get('Logic', ''),
'judge_status': int(row.get('JudgeStatus', 0)) if 'JudgeStatus' in row else None,
'is_distributed': int(row.get('IsDistributed', 0)) if 'IsDistributed' in row else 0
}
# 插入或更新内容
cursor.execute("""
INSERT OR REPLACE INTO contents
(entry_id, output_txt_path, poster_path, article_json_path,
product, object, date, logic, judge_status, is_distributed)
VALUES
(:entry_id, :output_txt_path, :poster_path, :article_json_path,
:product, :object, :date, :logic, :judge_status, :is_distributed)
""", data)
success_count += 1
except Exception as e:
logger.warning(f"导入内容 {row.get('EntryID')} 失败: {e}")
conn.commit()
# 记录导入后数据量
cursor.execute("SELECT COUNT(*) FROM contents")
after_count = cursor.fetchone()[0]
logger.info(f"内容清单导入完成,成功导入 {success_count} 条内容")
logger.info(f"数据库现有 {after_count} 条内容记录,本次操作新增 {after_count - before_count}")
return True
except Exception as e:
logger.error(f"导入内容清单失败: {e}")
return False
def import_users(conn, file_path, email_column='达人邮箱', username_column='小红书ID'):
"""导入用户数据到数据库"""
try:
# 读取CSV文件
df = pd.read_csv(file_path)
# 检查必要列
if email_column not in df.columns:
logger.error(f"用户CSV中缺少邮箱列 '{email_column}'")
return False
# 过滤有效邮箱
df = df[df[email_column].notna()]
df = df[df[email_column].astype(str).str.contains('@')]
# 获取用户名列,如果不存在则使用邮箱前缀
if username_column not in df.columns:
df[username_column] = df[email_column].apply(lambda x: x.split('@')[0])
# 记录导入前数据量
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM users")
before_count = cursor.fetchone()[0]
# 导入数据
success_count = 0
for _, row in df.iterrows():
try:
email = row[email_column]
username = row[username_column] if pd.notna(row[username_column]) else email.split('@')[0]
# 插入或更新用户
cursor.execute("""
INSERT OR REPLACE INTO users (email, username)
VALUES (?, ?)
""", (email, username))
success_count += 1
except Exception as e:
logger.warning(f"导入用户 {row.get(email_column)} 失败: {e}")
conn.commit()
# 记录导入后数据量
cursor.execute("SELECT COUNT(*) FROM users")
after_count = cursor.fetchone()[0]
logger.info(f"用户数据导入完成,成功导入 {success_count} 个用户")
logger.info(f"数据库现有 {after_count} 个用户记录,本次操作新增 {after_count - before_count}")
return True
except Exception as e:
logger.error(f"导入用户数据失败: {e}")
return False
def import_distribution(conn, file_path, batch_id=None):
"""导入分发记录到数据库"""
try:
# 读取CSV文件
df = pd.read_csv(file_path)
# 检查必要列
required_columns = ['email', 'entry_id']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
logger.error(f"分发记录CSV中缺少必要列: {', '.join(missing_columns)}")
return False
# 设置批次ID
if not batch_id:
batch_id = os.path.basename(file_path).split('.')[0]
# 记录导入前数据量
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM distributions")
before_count = cursor.fetchone()[0]
# 导入数据
success_count = 0
skipped_count = 0
for _, row in df.iterrows():
try:
email = row['email']
entry_id = row['entry_id']
send_status = row.get('send_status', 'unknown')
# 查找用户ID
cursor.execute("SELECT id FROM users WHERE email = ?", (email,))
user_result = cursor.fetchone()
if not user_result:
# 用户不存在,添加到数据库
cursor.execute("""
INSERT INTO users (email, username)
VALUES (?, ?)
""", (email, row.get('username', email.split('@')[0])))
user_id = cursor.lastrowid
else:
user_id = user_result[0]
# 查找内容ID
cursor.execute("SELECT id FROM contents WHERE entry_id = ?", (entry_id,))
content_result = cursor.fetchone()
if not content_result:
logger.warning(f"内容 {entry_id} 不存在于数据库中,尝试添加基本记录")
cursor.execute("""
INSERT INTO contents (entry_id)
VALUES (?)
""", (entry_id,))
content_id = cursor.lastrowid
else:
content_id = content_result[0]
# 检查是否已存在相同记录
cursor.execute("""
SELECT id FROM distributions
WHERE content_id = ? AND user_id = ? AND batch_id = ?
""", (content_id, user_id, batch_id))
if cursor.fetchone():
# 记录已存在,更新状态
cursor.execute("""
UPDATE distributions
SET send_status = ?, distribution_date = CURRENT_TIMESTAMP
WHERE content_id = ? AND user_id = ? AND batch_id = ?
""", (send_status, content_id, user_id, batch_id))
skipped_count += 1
else:
# 添加新记录
cursor.execute("""
INSERT INTO distributions
(content_id, user_id, distribution_date, send_status, batch_id)
VALUES (?, ?, CURRENT_TIMESTAMP, ?, ?)
""", (content_id, user_id, send_status, batch_id))
success_count += 1
except Exception as e:
logger.warning(f"导入分发记录 {row.get('entry_id')} -> {row.get('email')} 失败: {e}")
conn.commit()
# 记录导入后数据量
cursor.execute("SELECT COUNT(*) FROM distributions")
after_count = cursor.fetchone()[0]
logger.info(f"分发记录导入完成,成功导入 {success_count} 条记录,更新 {skipped_count} 条记录")
logger.info(f"数据库现有 {after_count} 条分发记录,本次操作新增 {after_count - before_count}")
return True
except Exception as e:
logger.error(f"导入分发记录失败: {e}")
conn.rollback()
return False
def import_distribution_result(conn, distribution_csv, result_csv, manifest_csv, batch_id=None):
"""从分发脚本结果导入数据到数据库"""
try:
# 设置批次ID
if not batch_id:
batch_id = f"batch_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
# 1. 导入内容清单
logger.info(f"正在从清单导入内容: {manifest_csv}")
import_contents(conn, manifest_csv)
# 2. 导入分发记录
logger.info(f"正在导入分发记录: {distribution_csv}")
dist_df = pd.read_csv(distribution_csv)
# 3. 合并发送结果
if os.path.exists(result_csv):
logger.info(f"正在合并发送结果: {result_csv}")
result_df = pd.read_csv(result_csv)
# 创建邮箱到状态的映射
email_status = {}
for _, row in result_df.iterrows():
email_status[row['email']] = row['status']
# 添加发送状态
dist_df['send_status'] = dist_df['email'].map(email_status)
else:
logger.warning(f"结果CSV文件不存在: {result_csv},所有记录状态将标记为'unknown'")
dist_df['send_status'] = 'unknown'
# 保存临时CSV文件用于导入
temp_csv = os.path.join(os.path.dirname(distribution_csv), f"temp_combined_{os.path.basename(distribution_csv)}")
dist_df.to_csv(temp_csv, index=False)
# 导入合并后的分发记录
success = import_distribution(conn, temp_csv, batch_id)
# 删除临时文件
try:
os.remove(temp_csv)
except:
pass
if success:
logger.info(f"分发结果成功导入数据库批次ID: {batch_id}")
return True
else:
logger.error(f"分发结果导入数据库失败")
return False
except Exception as e:
logger.error(f"导入分发结果失败: {e}")
return False
def record_extracted_content(
entry_id,
output_txt_path=None,
poster_path=None,
article_json_path=None,
product=None,
object=None,
date=None,
logic=None,
judge_status=None,
is_distributed=0,
db_path=None
):
"""在提取内容时直接记录到数据库
此函数可以在extract_and_render.py脚本中调用实现实时记录提取的内容
Args:
entry_id: 内容ID
output_txt_path: 输出文本文件路径
poster_path: 海报图片路径
article_json_path: 文章JSON文件路径
product: 产品名称
object: 景点名称
date: 日期
logic: 逻辑
judge_status: 审核状态 (0=未通过, 1=通过)
is_distributed: 是否已分发 (0=未分发, 1=已分发)
db_path: 数据库路径,默认使用配置的路径
Returns:
bool: 是否成功记录
"""
try:
# 使用默认数据库路径
if db_path is None:
db_path = '/root/autodl-tmp/TravelContentCreator/distribution.db'
# 检查数据库是否存在,如果不存在则初始化
if not os.path.exists(db_path):
logger.warning(f"数据库文件不存在: {db_path},将自动创建")
conn = create_connection(db_path)
init_database(conn)
else:
conn = create_connection(db_path)
if not conn:
logger.error(f"无法连接到数据库: {db_path}")
return False
try:
cursor = conn.cursor()
# 准备数据
data = {
'entry_id': entry_id,
'output_txt_path': output_txt_path or '',
'poster_path': poster_path or '',
'article_json_path': article_json_path or '',
'product': product or '',
'object': object or '',
'date': date or '',
'logic': logic or '',
'judge_status': judge_status if judge_status is not None else None,
'is_distributed': is_distributed
}
# 插入或更新内容
cursor.execute("""
INSERT OR REPLACE INTO contents
(entry_id, output_txt_path, poster_path, article_json_path,
product, object, date, logic, judge_status, is_distributed)
VALUES
(:entry_id, :output_txt_path, :poster_path, :article_json_path,
:product, :object, :date, :logic, :judge_status, :is_distributed)
""", data)
conn.commit()
logger.info(f"已将内容 {entry_id} 记录到数据库")
return True
except Exception as e:
logger.error(f"记录内容到数据库失败: {e}")
conn.rollback()
return False
finally:
conn.close()
except Exception as e:
logger.error(f"记录提取内容时发生错误: {e}")
return False
def query_content_status(conn, entry_id=None, product=None, object=None, output=None):
"""查询内容分发状态"""
try:
cursor = conn.cursor()
# 构建查询条件
conditions = []
params = []
if entry_id:
conditions.append("c.entry_id = ?")
params.append(entry_id)
if product:
conditions.append("c.product LIKE ?")
params.append(f"%{product}%")
if object:
conditions.append("c.object LIKE ?")
params.append(f"%{object}%")
where_clause = " AND ".join(conditions) if conditions else "1=1"
# 执行查询
query = f"""
SELECT
c.entry_id,
c.product,
c.object,
c.judge_status,
COUNT(DISTINCT d.user_id) as distribution_count,
SUM(CASE WHEN d.send_status LIKE '%success%' THEN 1 ELSE 0 END) as success_count,
GROUP_CONCAT(DISTINCT u.email) as recipients
FROM
contents c
LEFT JOIN
distributions d ON c.id = d.content_id
LEFT JOIN
users u ON d.user_id = u.id
WHERE
{where_clause}
GROUP BY
c.id
ORDER BY
distribution_count DESC, c.entry_id
"""
cursor.execute(query, params)
results = [dict(row) for row in cursor.fetchall()]
# 输出结果
if results:
logger.info(f"查询到 {len(results)} 条内容状态记录")
# 如果指定了输出文件
if output:
df = pd.DataFrame(results)
df.to_csv(output, index=False)
logger.info(f"查询结果已保存到: {output}")
else:
# 打印简要结果
for row in results[:10]: # 只显示前10条
status = "已审核通过" if row['judge_status'] == 1 else "未审核通过" if row['judge_status'] == 0 else "未知"
logger.info(f"内容ID: {row['entry_id']}, 产品: {row['product']}, 景点: {row['object']}, "
f"状态: {status}, 分发次数: {row['distribution_count']}, 成功: {row['success_count']}")
if len(results) > 10:
logger.info(f"... 还有 {len(results) - 10} 条记录未显示")
else:
logger.info("未查询到相关内容记录")
return results
except Exception as e:
logger.error(f"查询内容状态失败: {e}")
return []
def query_user_contents(conn, email, output=None):
"""查询用户接收的内容"""
try:
cursor = conn.cursor()
# 查询用户是否存在
cursor.execute("SELECT id FROM users WHERE email = ?", (email,))
user_result = cursor.fetchone()
if not user_result:
logger.info(f"用户 {email} 不存在于数据库中")
return []
user_id = user_result[0]
# 查询用户接收的内容
query = """
SELECT
u.email,
u.username,
c.entry_id,
c.product,
c.object,
d.distribution_date,
d.send_status,
d.batch_id
FROM
distributions d
JOIN
contents c ON d.content_id = c.id
JOIN
users u ON d.user_id = u.id
WHERE
d.user_id = ?
ORDER BY
d.distribution_date DESC
"""
cursor.execute(query, (user_id,))
results = [dict(row) for row in cursor.fetchall()]
# 输出结果
if results:
logger.info(f"用户 {email} 共接收了 {len(results)} 条内容")
# 如果指定了输出文件
if output:
df = pd.DataFrame(results)
df.to_csv(output, index=False)
logger.info(f"查询结果已保存到: {output}")
else:
# 打印简要结果
for row in results[:10]: # 只显示前10条
dist_date = row['distribution_date']
logger.info(f"内容ID: {row['entry_id']}, 产品: {row['product']}, 景点: {row['object']}, "
f"发送时间: {dist_date}, 状态: {row['send_status']}, 批次: {row['batch_id']}")
if len(results) > 10:
logger.info(f"... 还有 {len(results) - 10} 条记录未显示")
else:
logger.info(f"用户 {email} 未接收任何内容")
return results
except Exception as e:
logger.error(f"查询用户内容失败: {e}")
return []
def generate_report(conn, output=None, format='csv'):
"""生成统计报告"""
try:
cursor = conn.cursor()
# 基本统计信息
statistics = {}
# 统计内容总数
cursor.execute("SELECT COUNT(*) FROM contents")
statistics['total_contents'] = cursor.fetchone()[0]
# 统计用户总数
cursor.execute("SELECT COUNT(*) FROM users")
statistics['total_users'] = cursor.fetchone()[0]
# 统计分发记录总数
cursor.execute("SELECT COUNT(*) FROM distributions")
statistics['total_distributions'] = cursor.fetchone()[0]
# 统计成功发送的分发记录数
cursor.execute("SELECT COUNT(*) FROM distributions WHERE send_status LIKE '%success%'")
statistics['successful_distributions'] = cursor.fetchone()[0]
# 统计不同批次的分发数量
cursor.execute("""
SELECT batch_id, COUNT(*) as count
FROM distributions
GROUP BY batch_id
ORDER BY count DESC
""")
statistics['batch_statistics'] = [dict(row) for row in cursor.fetchall()]
# 按产品统计分发情况
cursor.execute("""
SELECT
c.product,
COUNT(DISTINCT c.id) as total_contents,
COUNT(DISTINCT d.id) as distribution_count,
COUNT(DISTINCT d.user_id) as user_count,
SUM(CASE WHEN d.send_status LIKE '%success%' THEN 1 ELSE 0 END) as success_count
FROM
contents c
LEFT JOIN
distributions d ON c.id = d.content_id
WHERE
c.product IS NOT NULL AND c.product != ''
GROUP BY
c.product
ORDER BY
distribution_count DESC
""")
statistics['product_statistics'] = [dict(row) for row in cursor.fetchall()]
# 按景点统计分发情况
cursor.execute("""
SELECT
c.object,
COUNT(DISTINCT c.id) as total_contents,
COUNT(DISTINCT d.id) as distribution_count,
COUNT(DISTINCT d.user_id) as user_count,
SUM(CASE WHEN d.send_status LIKE '%success%' THEN 1 ELSE 0 END) as success_count
FROM
contents c
LEFT JOIN
distributions d ON c.id = d.content_id
WHERE
c.object IS NOT NULL AND c.object != ''
GROUP BY
c.object
ORDER BY
distribution_count DESC
LIMIT 20
""")
statistics['object_statistics'] = [dict(row) for row in cursor.fetchall()]
# 分发最多的内容
cursor.execute("""
SELECT
c.entry_id,
c.product,
c.object,
COUNT(d.id) as distribution_count,
SUM(CASE WHEN d.send_status LIKE '%success%' THEN 1 ELSE 0 END) as success_count
FROM
contents c
JOIN
distributions d ON c.id = d.content_id
GROUP BY
c.id
ORDER BY
distribution_count DESC
LIMIT 10
""")
statistics['most_distributed_contents'] = [dict(row) for row in cursor.fetchall()]
# 接收最多内容的用户
cursor.execute("""
SELECT
u.email,
u.username,
COUNT(d.id) as received_count,
SUM(CASE WHEN d.send_status LIKE '%success%' THEN 1 ELSE 0 END) as success_count
FROM
users u
JOIN
distributions d ON u.id = d.user_id
GROUP BY
u.id
ORDER BY
received_count DESC
LIMIT 10
""")
statistics['most_active_users'] = [dict(row) for row in cursor.fetchall()]
# 打印基本统计信息
logger.info("\n====== 内容分发数据库统计报告 ======")
logger.info(f"内容总数: {statistics['total_contents']}")
logger.info(f"用户总数: {statistics['total_users']}")
logger.info(f"分发记录总数: {statistics['total_distributions']}")
logger.info(f"成功发送记录数: {statistics['successful_distributions']}")
logger.info("\n----- 按产品统计 -----")
for product in statistics['product_statistics'][:5]:
if product['distribution_count'] > 0:
logger.info(f"产品: {product['product']}, 内容数: {product['total_contents']}, "
f"分发次数: {product['distribution_count']}, 成功: {product['success_count']}")
logger.info("\n----- 分发最多的内容 -----")
for content in statistics['most_distributed_contents'][:5]:
logger.info(f"内容ID: {content['entry_id']}, 产品: {content['product']}, "
f"景点: {content['object']}, 分发次数: {content['distribution_count']}, "
f"成功: {content['success_count']}")
# 输出报告
if output:
if format == 'json':
with open(output, 'w', encoding='utf-8') as f:
json.dump(statistics, f, ensure_ascii=False, indent=2)
else: # CSV格式
# 创建多个CSV文件
output_base = os.path.splitext(output)[0]
# 产品统计
pd.DataFrame(statistics['product_statistics']).to_csv(f"{output_base}_products.csv", index=False)
# 景点统计
pd.DataFrame(statistics['object_statistics']).to_csv(f"{output_base}_objects.csv", index=False)
# 内容分发统计
pd.DataFrame(statistics['most_distributed_contents']).to_csv(f"{output_base}_contents.csv", index=False)
# 用户接收统计
pd.DataFrame(statistics['most_active_users']).to_csv(f"{output_base}_users.csv", index=False)
# 批次统计
pd.DataFrame(statistics['batch_statistics']).to_csv(f"{output_base}_batches.csv", index=False)
# 基本统计
basic_stats = {k: v for k, v in statistics.items() if not isinstance(v, list)}
pd.DataFrame([basic_stats]).to_csv(f"{output_base}_summary.csv", index=False)
logger.info(f"统计报告已保存到: {output}")
return statistics
except Exception as e:
logger.error(f"生成统计报告失败: {e}")
return {}
def main():
args = parse_arguments()
# 创建数据库连接
conn = create_connection(args.db_path)
if not conn:
logger.error("无法连接到数据库,程序退出")
return
try:
# 根据命令执行相应操作
if args.command == 'init':
init_database(conn)
elif args.command == 'import-contents':
import_contents(conn, args.file)
elif args.command == 'import-users':
import_users(conn, args.file, args.email_column, args.username_column)
elif args.command == 'import-distribution':
import_distribution(conn, args.file, args.batch_id)
elif args.command == 'import-distribution-result':
import_distribution_result(conn, args.distribution_csv, args.result_csv,
args.manifest_csv, args.batch_id)
elif args.command == 'query-content':
query_content_status(conn, args.entry_id, args.product, args.object, args.output)
elif args.command == 'query-user':
query_user_contents(conn, args.email, args.output)
elif args.command == 'report':
generate_report(conn, args.output, args.format)
else:
logger.error("未指定有效命令,请使用 --help 查看帮助")
finally:
conn.close()
if __name__ == "__main__":
main()