修改了适用于新编码结果的分发方式

This commit is contained in:
jinye_huang 2025-05-19 20:52:04 +08:00
parent b7c5f92e89
commit fe1cbae9c8
3 changed files with 381 additions and 65 deletions

View File

@ -16,8 +16,8 @@ EMAIL_FROM="zwysendemail@163.com"
EMAIL_PASSWORD="NMhVGFmCJkGEy3B5"
# EMAIL_FROM="zowoyomedia@163.com"
# EMAIL_PASSWORD="SDj5fK6Tk9YevmsD"
SUBJECT="文旅小红书带货笔记内容0516"
ZIP_FILENAME="文旅小红书带货笔记内容0516"
SUBJECT="文旅小红书带货笔记内容0519"
ZIP_FILENAME="文旅小红书带货笔记内容0519"
# 设置分发配置
ARTICLE_PER_USER=1
@ -25,7 +25,7 @@ MAX_USERS=-1 # 最多发送给多少用户,不限制则设置为-1
TEST_MODE=false # 测试模式,不实际发送邮件
# 测试邮箱配置(新增)
TEST_EMAIL_MODE=true # 是否启用测试邮箱模式
TEST_EMAIL_MODE=false # 是否启用测试邮箱模式
TEST_EMAIL="jinye_huang@foxmail.com" # 测试邮箱地址,所有邮件都会发送到这里
JUDGE_ONLY=true # 只分发审核通过的内容
@ -33,7 +33,7 @@ UNDISTRIBUTED_ONLY=true # 只分发未分发的内容
# 内容筛选配置
TARGET_PRODUCT="" # 为空则不筛选特定产品
TARGET_OBJECT="深圳青青世界酒店" # 为空则不筛选特定景点
TARGET_OBJECT="北洛秘境盛季酒店" # 为空则不筛选特定景点
# 用户筛选配置
TARGET_USER_ID="" # 为空则不筛选特定用户ID

View File

@ -11,6 +11,7 @@ import argparse
from datetime import datetime
import sqlite3
import logging
import base64
# 配置日志
logging.basicConfig(
@ -140,6 +141,9 @@ def convert_json_to_txt_content(json_path, prefer_original=False):
- judge_success为True时使用title/content除非prefer_original=True
- judge_success为False时使用original_title/original_content
支持base64编码的内容
- 如果检测到title_base64和content_base64字段将优先使用这些字段
Args:
json_path: JSON文件路径
prefer_original: 是否优先使用原始内容无视judge_success结果
@ -153,46 +157,105 @@ def convert_json_to_txt_content(json_path, prefer_original=False):
with open(json_path, 'r', encoding='utf-8') as f_json:
data = json.load(f_json)
# 根据judge_success选择标题和内容
judge_success = data.get('judge_success', None)
# 优先检查是否有base64编码的内容
title = None
content = None
original_title = None
original_content = None
tags = None
if prefer_original and 'original_title' in data and 'original_content' in data:
# 优先使用原始内容
title = data.get('original_title', '未找到原始标题')
content = data.get('original_content', '未找到原始内容')
# 优先使用原始标签
tags = data.get('original_tags', data.get('tags', '未找到标签'))
print(f" - 优先使用原始内容 (prefer_original=True)")
elif judge_success is True and not prefer_original:
# 使用审核后的内容
title = data.get('title', '未找到标题')
content = data.get('content', '未找到内容')
tags = data.get('tags', '未找到标签')
print(f" - 使用审核后内容 (judge_success=True)")
elif 'original_title' in data and 'original_content' in data:
# 使用原始内容
title = data.get('original_title', '未找到原始标题')
content = data.get('original_content', '未找到原始内容')
# 优先使用原始标签
tags = data.get('original_tags', data.get('tags', '未找到标签'))
print(f" - 使用原始内容 (judge_success={judge_success})")
else:
# 若无original字段使用常规字段
title = data.get('title', '未找到标题')
content = data.get('content', '未找到内容')
tags = data.get('tags', '未找到标签')
print(f" - 使用常规内容 (无judge结果)")
# 尝试从base64字段获取内容
try:
# 优先使用base64编码的内容
if "title_base64" in data:
title = base64.b64decode(data["title_base64"]).decode('utf-8')
print(f" - 成功从base64解码标题")
if "content_base64" in data:
content = base64.b64decode(data["content_base64"]).decode('utf-8')
print(f" - 成功从base64解码内容")
if "tags_base64" in data:
tags = base64.b64decode(data["tags_base64"]).decode('utf-8')
print(f" - 成功从base64解码标签")
elif "tags" in data:
tags = data.get("tags", "")
elif "tag" in data:
tags = data.get("tag", "")
# 检查是否有原始内容的base64
if "original_title_base64" in data:
original_title = base64.b64decode(data["original_title_base64"]).decode('utf-8')
if "original_content_base64" in data:
original_content = base64.b64decode(data["original_content_base64"]).decode('utf-8')
if "original_tags_base64" in data:
original_tags = base64.b64decode(data["original_tags_base64"]).decode('utf-8')
elif "original_tags" in data:
original_tags = data.get("original_tags", "")
# 如果prefer_original为True且有原始内容使用原始内容
if prefer_original and original_title and original_content:
title = original_title
content = original_content
tags = original_tags if original_tags else tags
print(f" - 使用解码后的原始内容 (prefer_original=True)")
except Exception as e:
print(f" - 警告: base64解码失败: {e},将尝试使用普通字段")
title = None
content = None
# 解决tag/tags字段重复问题按照修正后的处理逻辑只使用tags字段
if not tags and 'tag' in data:
tags = data.get('tag', '未找到标签')
print(f" - 使用tag字段作为标签 (该字段将在后续版本中统一为tags)")
# 如果base64解码失败或不存在base64字段则使用原始逻辑
if title is None or content is None:
# 根据judge_success选择标题和内容
judge_success = data.get('judge_success', None)
if prefer_original and 'original_title' in data and 'original_content' in data:
# 优先使用原始内容
title = data.get('original_title', '未找到原始标题')
content = data.get('original_content', '未找到原始内容')
# 优先使用原始标签
tags = data.get('original_tags', data.get('tags', '未找到标签'))
print(f" - 优先使用原始内容 (prefer_original=True)")
elif judge_success is True and not prefer_original:
# 使用审核后的内容
title = data.get('title', '未找到标题')
content = data.get('content', '未找到内容')
tags = data.get('tags', '未找到标签')
print(f" - 使用审核后内容 (judge_success=True)")
elif 'original_title' in data and 'original_content' in data:
# 使用原始内容
title = data.get('original_title', '未找到原始标题')
content = data.get('original_content', '未找到原始内容')
# 优先使用原始标签
tags = data.get('original_tags', data.get('tags', '未找到标签'))
print(f" - 使用原始内容 (judge_success={judge_success})")
else:
# 若无original字段使用常规字段
title = data.get('title', '未找到标题')
content = data.get('content', '未找到内容')
tags = data.get('tags', '未找到标签')
print(f" - 使用常规内容 (无judge结果)")
# 解决tag/tags字段重复问题按照修正后的处理逻辑只使用tags字段
if not tags and 'tag' in data:
tags = data.get('tag', '未找到标签')
print(f" - 使用tag字段作为标签 (该字段将在后续版本中统一为tags)")
# 移除Markdown格式
# 移除Markdown格式,但保留换行符
content_no_format = re.sub(r'\*\*(.*?)\*\*', r'\1', content)
# 组合输出文本
return f"{title}\n\n{content_no_format}\n\n{tags}", None
# 组合输出文本,保留原始内容的所有换行符
result = ""
if title:
result += title + "\n\n"
if content_no_format:
result += content_no_format
if tags and tags != "未找到标签":
result += "\n\n" + tags
return result, None
except json.JSONDecodeError:
print(f" - 错误: JSON 格式无效: {json_path}")
return None, f"无效的 JSON 格式: {json_path}"
@ -200,6 +263,41 @@ def convert_json_to_txt_content(json_path, prefer_original=False):
print(f" - 错误: 处理 JSON 时出错: {e}")
return None, f"处理 JSON 时出错: {e}"
def process_txt_content(txt_path):
"""
直接读取TXT文件内容移除Markdown格式并返回处理后的文本
Args:
txt_path: TXT文件路径
Returns:
tuple: (处理后的内容错误信息)
"""
print(f" - 正在读取TXT: {txt_path}")
if not os.path.exists(txt_path):
print(f" - 警告: TXT文件不存在: {txt_path}")
return None, f"文件未找到: {txt_path}"
try:
# 读取TXT文件内容
with open(txt_path, 'r', encoding='utf-8') as f_txt:
content = f_txt.read()
# 移除Markdown格式但保留换行符
# 处理粗体
content_no_format = re.sub(r'\*\*(.*?)\*\*', r'\1', content)
# 处理斜体
content_no_format = re.sub(r'\*(.*?)\*', r'\1', content_no_format)
# 处理链接 [文本](链接)
content_no_format = re.sub(r'\[(.*?)\]\(.*?\)', r'\1', content_no_format)
# 处理标题 # 文本
content_no_format = re.sub(r'^#+ (.*?)$', r'\1', content_no_format, flags=re.MULTILINE)
return content_no_format, None
except Exception as e:
print(f" - 错误: 处理TXT时出错: {e}")
return None, f"处理TXT时出错: {e}"
def load_topic_data(source_dir, run_id):
"""
加载选题数据
@ -364,14 +462,15 @@ def process_result_directory(source_dir, output_dir, run_id=None, prefer_origina
print(f" - 错误: {record['Details']}")
continue
# 1. 处理article.json -> txt
json_path = os.path.join(entry_path, "article.json")
txt_path = os.path.join(output_entry_path, "article.txt")
record["ArticleJsonPath"] = json_path
record["OutputTxtPath"] = txt_path
# 1. 处理article.txt
input_txt_path = os.path.join(entry_path, "article.txt")
output_txt_path = os.path.join(output_entry_path, "article.txt")
record["OutputTxtPath"] = output_txt_path
# 读取article.json仅用于获取judge_status
json_path = os.path.join(entry_path, "article.json")
record["ArticleJsonPath"] = json_path
# 读取article.json
article_data = {}
if os.path.exists(json_path):
try:
with open(json_path, 'r', encoding='utf-8') as f_json:
@ -382,28 +481,21 @@ def process_result_directory(source_dir, output_dir, run_id=None, prefer_origina
elif "judged" in article_data:
record["JudgeStatus"] = "已审核" if article_data["judged"] else "未审核"
except Exception as e:
print(f" - 错误: 读取article.json失败: {e}")
txt_content, error = convert_json_to_txt_content(json_path, prefer_original)
print(f" - 警告: 读取article.json失败: {e}")
# 处理article.txt文件
if os.path.exists(input_txt_path):
processed_content, error = process_txt_content(input_txt_path)
if error:
record["Status"] = "Partial"
record["Details"] += f"文章处理失败: {error}; "
print(f" - 错误: {record['Details']}")
else:
try:
with open(txt_path, 'w', encoding='utf-8') as f_txt:
f_txt.write(txt_content)
print(f" - 成功写入文本文件: {txt_path}")
# 记录内容来源
if prefer_original:
record["ContentSource"] = "original_preferred"
elif article_data.get("judge_success") is True:
record["ContentSource"] = "judged"
elif "original_title" in article_data:
record["ContentSource"] = "original"
else:
record["ContentSource"] = "default"
with open(output_txt_path, 'w', encoding='utf-8') as f_txt:
f_txt.write(processed_content)
print(f" - 成功写入处理后的文本文件: {output_txt_path}")
record["ContentSource"] = "txt_file"
except Exception as e:
record["Status"] = "Partial"
@ -411,7 +503,7 @@ def process_result_directory(source_dir, output_dir, run_id=None, prefer_origina
print(f" - 错误: {record['Details']}")
else:
record["Status"] = "Partial"
record["Details"] += "文章JSON文件不存在; "
record["Details"] += "文章TXT文件不存在; "
print(f" - 警告: {record['Details']}")
# 2. 处理海报图片
@ -535,8 +627,8 @@ def main():
args = parser.parse_args()
# 默认值设置
source = args.source if args.source else "/root/autodl-tmp/TravelContentCreator/result/2025-05-15_20-01-48"
output = args.output if args.output else "/root/autodl-tmp/TravelContentCreator/output/2025-05-15_20-01-48"
source = args.source if args.source else "/root/autodl-tmp/TravelContentCreator/result/2025-05-19_17-51-07"
output = args.output if args.output else "/root/autodl-tmp/TravelContentCreator/output/2025-05-19_17-51-07"
run_id = args.run_id if args.run_id else os.path.basename(source)
prefer_original = args.prefer_original
db_path = args.db_path if args.db_path else '/root/autodl-tmp/TravelContentCreator/distribution.db'

View File

@ -0,0 +1,224 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sqlite3
import csv
import sys
import argparse
from datetime import datetime
# 数据库路径
db_path = '/root/autodl-tmp/TravelContentCreator/distribution.db'
def query_products(product_name=None, object_name=None, output_file=None, show_undistributed_only=False):
"""查询产品信息
Args:
product_name: 产品名称可选
object_name: 景点名称可选
output_file: 输出CSV文件路径可选
show_undistributed_only: 是否只显示未分发的内容
Returns:
查询结果列表
"""
# 连接数据库
try:
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row # 设置结果为字典格式
cursor = conn.cursor()
print(f"已连接到数据库: {db_path}")
except sqlite3.Error as e:
print(f"数据库连接错误: {e}")
return []
try:
# 构建查询条件
conditions = []
params = []
if product_name:
conditions.append("product LIKE ?")
params.append(f"%{product_name}%")
if object_name:
conditions.append("object LIKE ?")
params.append(f"%{object_name}%")
if show_undistributed_only:
conditions.append("is_distributed = 0")
# 构建WHERE子句
where_clause = " AND ".join(conditions) if conditions else "1=1"
# 执行查询
query = f"""
SELECT
id, entry_id, product, object, date, logic, judge_status,
output_txt_path, poster_path, article_json_path, created_at, is_distributed
FROM
contents
WHERE
{where_clause}
ORDER BY
product, object, entry_id
"""
cursor.execute(query, params)
results = [dict(row) for row in cursor.fetchall()]
# 获取分布统计信息
statistics = {}
if results:
# 按产品分组统计
cursor.execute(f"""
SELECT
product,
COUNT(*) as count,
COUNT(CASE WHEN judge_status = 1 THEN 1 END) as approved_count,
COUNT(CASE WHEN is_distributed = 1 THEN 1 END) as distributed_count
FROM
contents
WHERE
product IS NOT NULL AND product != '' AND {where_clause}
GROUP BY
product
ORDER BY
count DESC
""", params)
statistics['products'] = [dict(row) for row in cursor.fetchall()]
# 按景点分组统计
cursor.execute(f"""
SELECT
object,
COUNT(*) as count,
COUNT(CASE WHEN judge_status = 1 THEN 1 END) as approved_count,
COUNT(CASE WHEN is_distributed = 1 THEN 1 END) as distributed_count
FROM
contents
WHERE
object IS NOT NULL AND object != '' AND {where_clause}
GROUP BY
object
ORDER BY
count DESC
LIMIT 20
""", params)
statistics['objects'] = [dict(row) for row in cursor.fetchall()]
# 输出结果
if results:
print(f"\n查询到 {len(results)} 条产品记录")
# 输出前10条记录
print("\n===== 查询结果 (前10条) =====")
for i, row in enumerate(results[:10], 1):
judge_status = '已通过' if row['judge_status'] == 1 else '未通过' if row['judge_status'] == 0 else '未知'
distributed = '已分发' if row['is_distributed'] == 1 else '未分发'
print(f"{i}. ID: {row['entry_id']}, 产品: {row['product']}, 景点: {row['object']}, 审核: {judge_status}, 分发状态: {distributed}")
if len(results) > 10:
print(f"... 还有 {len(results) - 10} 条记录未显示")
# 输出统计信息
if 'products' in statistics and statistics['products']:
print("\n===== 产品统计 =====")
for prod in statistics['products']:
dist_percent = (prod['distributed_count'] / prod['count'] * 100) if prod['count'] > 0 else 0
approved_percent = (prod['approved_count'] / prod['count'] * 100) if prod['count'] > 0 else 0
print(f"产品: {prod['product']}")
print(f" - 内容总数: {prod['count']}")
print(f" - 已审核通过: {prod['approved_count']} ({approved_percent:.1f}%)")
print(f" - 已分发: {prod['distributed_count']} ({dist_percent:.1f}%)")
print(f" - 未分发: {prod['count'] - prod['distributed_count']} ({100 - dist_percent:.1f}%)")
if 'objects' in statistics and statistics['objects'] and len(statistics['objects']) <= 10:
print("\n===== 景点统计 =====")
for obj in statistics['objects']:
dist_percent = (obj['distributed_count'] / obj['count'] * 100) if obj['count'] > 0 else 0
approved_percent = (obj['approved_count'] / obj['count'] * 100) if obj['count'] > 0 else 0
print(f"景点: {obj['object']}")
print(f" - 内容总数: {obj['count']}")
print(f" - 已审核通过: {obj['approved_count']} ({approved_percent:.1f}%)")
print(f" - 已分发: {obj['distributed_count']} ({dist_percent:.1f}%)")
print(f" - 未分发: {obj['count'] - obj['distributed_count']} ({100 - dist_percent:.1f}%)")
# 未分发内容和已分发内容的汇总
total_count = len(results)
distributed_count = sum(1 for r in results if r['is_distributed'] == 1)
undistributed_count = total_count - distributed_count
print("\n===== 分发状态汇总 =====")
print(f"总内容数: {total_count}")
print(f"已分发: {distributed_count} ({distributed_count/total_count*100:.1f}% 如果为0)" if total_count > 0 else "已分发: 0 (0%)")
print(f"未分发: {undistributed_count} ({undistributed_count/total_count*100:.1f}% 如果为100%)" if total_count > 0 else "未分发: 0 (0%)")
# 如果指定了输出文件保存结果到CSV
if output_file:
# 确保目录存在
os.makedirs(os.path.dirname(os.path.abspath(output_file)), exist_ok=True)
with open(output_file, 'w', newline='', encoding='utf-8-sig') as f:
if results:
# 确定CSV列
fieldnames = list(results[0].keys())
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(results)
print(f"\n查询结果已保存到: {output_file}")
else:
print("\n未查询到相关产品记录")
# 检查数据库是否有任何内容记录
cursor.execute("SELECT COUNT(*) as count FROM contents")
count = cursor.fetchone()['count']
if count == 0:
print("\n提示: 数据库中没有任何内容记录,请先导入数据")
else:
print(f"\n提示: 数据库中有 {count} 条内容记录,但没有符合条件的产品")
return results
except Exception as e:
print(f"查询产品时出错: {e}")
import traceback
traceback.print_exc()
return []
finally:
conn.close()
def main():
parser = argparse.ArgumentParser(description="查询数据库中的产品信息")
parser.add_argument("--product", type=str, help="按产品名称查询")
parser.add_argument("--object", type=str, help="按景点名称查询")
parser.add_argument("--output", type=str, help="输出CSV文件路径")
parser.add_argument("--all", action="store_true", help="查询所有产品")
parser.add_argument("--export-csv", action="store_true", help="导出结果到CSV文件")
parser.add_argument("--undistributed", action="store_true", help="只显示未分发的内容")
args = parser.parse_args()
# 默认输出文件
output_file = None
if args.output:
output_file = args.output
elif args.export_csv or args.all:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
suffix = "_undistributed" if args.undistributed else ""
output_file = f"/root/autodl-tmp/TravelContentCreator/output/product_query{suffix}_{timestamp}.csv"
# 执行查询
if args.product or args.object or args.all or args.undistributed:
results = query_products(args.product, args.object, output_file, args.undistributed)
return len(results) > 0
else:
print("请提供查询条件: --product, --object, --all 或 --undistributed")
parser.print_help()
return False
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)