From fe1cbae9c84f9607a86b242c28fefd3b65c1d27e Mon Sep 17 00:00:00 2001 From: jinye_huang Date: Mon, 19 May 2025 20:52:04 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=BA=86=E9=80=82=E7=94=A8?= =?UTF-8?q?=E4=BA=8E=E6=96=B0=E7=BC=96=E7=A0=81=E7=BB=93=E6=9E=9C=E7=9A=84?= =?UTF-8?q?=E5=88=86=E5=8F=91=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scripts/distribution/distribute_example.sh | 8 +- scripts/distribution/extract_and_render.py | 214 ++++++++++++++------ scripts/distribution/query_products.py | 224 +++++++++++++++++++++ 3 files changed, 381 insertions(+), 65 deletions(-) create mode 100755 scripts/distribution/query_products.py diff --git a/scripts/distribution/distribute_example.sh b/scripts/distribution/distribute_example.sh index 0b47d7a..4b05543 100755 --- a/scripts/distribution/distribute_example.sh +++ b/scripts/distribution/distribute_example.sh @@ -16,8 +16,8 @@ EMAIL_FROM="zwysendemail@163.com" EMAIL_PASSWORD="NMhVGFmCJkGEy3B5" # EMAIL_FROM="zowoyomedia@163.com" # EMAIL_PASSWORD="SDj5fK6Tk9YevmsD" -SUBJECT="文旅小红书带货笔记内容0516" -ZIP_FILENAME="文旅小红书带货笔记内容0516" +SUBJECT="文旅小红书带货笔记内容0519" +ZIP_FILENAME="文旅小红书带货笔记内容0519" # 设置分发配置 ARTICLE_PER_USER=1 @@ -25,7 +25,7 @@ MAX_USERS=-1 # 最多发送给多少用户,不限制则设置为-1 TEST_MODE=false # 测试模式,不实际发送邮件 # 测试邮箱配置(新增) -TEST_EMAIL_MODE=true # 是否启用测试邮箱模式 +TEST_EMAIL_MODE=false # 是否启用测试邮箱模式 TEST_EMAIL="jinye_huang@foxmail.com" # 测试邮箱地址,所有邮件都会发送到这里 JUDGE_ONLY=true # 只分发审核通过的内容 @@ -33,7 +33,7 @@ UNDISTRIBUTED_ONLY=true # 只分发未分发的内容 # 内容筛选配置 TARGET_PRODUCT="" # 为空则不筛选特定产品 -TARGET_OBJECT="深圳青青世界酒店" # 为空则不筛选特定景点 +TARGET_OBJECT="北洛秘境盛季酒店" # 为空则不筛选特定景点 # 用户筛选配置 TARGET_USER_ID="" # 为空则不筛选特定用户ID diff --git a/scripts/distribution/extract_and_render.py b/scripts/distribution/extract_and_render.py index f43c26c..5d30cee 100644 --- a/scripts/distribution/extract_and_render.py +++ b/scripts/distribution/extract_and_render.py @@ -11,6 +11,7 @@ import argparse from datetime import datetime import sqlite3 import logging +import base64 # 配置日志 logging.basicConfig( @@ -140,6 +141,9 @@ def convert_json_to_txt_content(json_path, prefer_original=False): - judge_success为True时使用title/content(除非prefer_original=True) - judge_success为False时使用original_title/original_content + 支持base64编码的内容: + - 如果检测到title_base64和content_base64字段,将优先使用这些字段 + Args: json_path: JSON文件路径 prefer_original: 是否优先使用原始内容,无视judge_success结果 @@ -153,46 +157,105 @@ def convert_json_to_txt_content(json_path, prefer_original=False): with open(json_path, 'r', encoding='utf-8') as f_json: data = json.load(f_json) - # 根据judge_success选择标题和内容 - judge_success = data.get('judge_success', None) + # 优先检查是否有base64编码的内容 + title = None + content = None + original_title = None + original_content = None + tags = None - if prefer_original and 'original_title' in data and 'original_content' in data: - # 优先使用原始内容 - title = data.get('original_title', '未找到原始标题') - content = data.get('original_content', '未找到原始内容') - # 优先使用原始标签 - tags = data.get('original_tags', data.get('tags', '未找到标签')) - print(f" - 优先使用原始内容 (prefer_original=True)") - elif judge_success is True and not prefer_original: - # 使用审核后的内容 - title = data.get('title', '未找到标题') - content = data.get('content', '未找到内容') - tags = data.get('tags', '未找到标签') - print(f" - 使用审核后内容 (judge_success=True)") - elif 'original_title' in data and 'original_content' in data: - # 使用原始内容 - title = data.get('original_title', '未找到原始标题') - content = data.get('original_content', '未找到原始内容') - # 优先使用原始标签 - tags = data.get('original_tags', data.get('tags', '未找到标签')) - print(f" - 使用原始内容 (judge_success={judge_success})") - else: - # 若无original字段,使用常规字段 - title = data.get('title', '未找到标题') - content = data.get('content', '未找到内容') - tags = data.get('tags', '未找到标签') - print(f" - 使用常规内容 (无judge结果)") + # 尝试从base64字段获取内容 + try: + # 优先使用base64编码的内容 + if "title_base64" in data: + title = base64.b64decode(data["title_base64"]).decode('utf-8') + print(f" - 成功从base64解码标题") + + if "content_base64" in data: + content = base64.b64decode(data["content_base64"]).decode('utf-8') + print(f" - 成功从base64解码内容") + + if "tags_base64" in data: + tags = base64.b64decode(data["tags_base64"]).decode('utf-8') + print(f" - 成功从base64解码标签") + elif "tags" in data: + tags = data.get("tags", "") + elif "tag" in data: + tags = data.get("tag", "") + + # 检查是否有原始内容的base64 + if "original_title_base64" in data: + original_title = base64.b64decode(data["original_title_base64"]).decode('utf-8') + + if "original_content_base64" in data: + original_content = base64.b64decode(data["original_content_base64"]).decode('utf-8') + + if "original_tags_base64" in data: + original_tags = base64.b64decode(data["original_tags_base64"]).decode('utf-8') + elif "original_tags" in data: + original_tags = data.get("original_tags", "") + + # 如果prefer_original为True且有原始内容,使用原始内容 + if prefer_original and original_title and original_content: + title = original_title + content = original_content + tags = original_tags if original_tags else tags + print(f" - 使用解码后的原始内容 (prefer_original=True)") + except Exception as e: + print(f" - 警告: base64解码失败: {e},将尝试使用普通字段") + title = None + content = None - # 解决tag/tags字段重复问题,按照修正后的处理逻辑,只使用tags字段 - if not tags and 'tag' in data: - tags = data.get('tag', '未找到标签') - print(f" - 使用tag字段作为标签 (该字段将在后续版本中统一为tags)") + # 如果base64解码失败或不存在base64字段,则使用原始逻辑 + if title is None or content is None: + # 根据judge_success选择标题和内容 + judge_success = data.get('judge_success', None) + + if prefer_original and 'original_title' in data and 'original_content' in data: + # 优先使用原始内容 + title = data.get('original_title', '未找到原始标题') + content = data.get('original_content', '未找到原始内容') + # 优先使用原始标签 + tags = data.get('original_tags', data.get('tags', '未找到标签')) + print(f" - 优先使用原始内容 (prefer_original=True)") + elif judge_success is True and not prefer_original: + # 使用审核后的内容 + title = data.get('title', '未找到标题') + content = data.get('content', '未找到内容') + tags = data.get('tags', '未找到标签') + print(f" - 使用审核后内容 (judge_success=True)") + elif 'original_title' in data and 'original_content' in data: + # 使用原始内容 + title = data.get('original_title', '未找到原始标题') + content = data.get('original_content', '未找到原始内容') + # 优先使用原始标签 + tags = data.get('original_tags', data.get('tags', '未找到标签')) + print(f" - 使用原始内容 (judge_success={judge_success})") + else: + # 若无original字段,使用常规字段 + title = data.get('title', '未找到标题') + content = data.get('content', '未找到内容') + tags = data.get('tags', '未找到标签') + print(f" - 使用常规内容 (无judge结果)") + + # 解决tag/tags字段重复问题,按照修正后的处理逻辑,只使用tags字段 + if not tags and 'tag' in data: + tags = data.get('tag', '未找到标签') + print(f" - 使用tag字段作为标签 (该字段将在后续版本中统一为tags)") - # 移除Markdown格式 + # 移除Markdown格式,但保留换行符 content_no_format = re.sub(r'\*\*(.*?)\*\*', r'\1', content) - # 组合输出文本 - return f"{title}\n\n{content_no_format}\n\n{tags}", None + # 组合输出文本,保留原始内容的所有换行符 + result = "" + if title: + result += title + "\n\n" + if content_no_format: + result += content_no_format + if tags and tags != "未找到标签": + result += "\n\n" + tags + + return result, None except json.JSONDecodeError: print(f" - 错误: JSON 格式无效: {json_path}") return None, f"无效的 JSON 格式: {json_path}" @@ -200,6 +263,41 @@ def convert_json_to_txt_content(json_path, prefer_original=False): print(f" - 错误: 处理 JSON 时出错: {e}") return None, f"处理 JSON 时出错: {e}" +def process_txt_content(txt_path): + """ + 直接读取TXT文件内容,移除Markdown格式,并返回处理后的文本。 + + Args: + txt_path: TXT文件路径 + + Returns: + tuple: (处理后的内容,错误信息) + """ + print(f" - 正在读取TXT: {txt_path}") + if not os.path.exists(txt_path): + print(f" - 警告: TXT文件不存在: {txt_path}") + return None, f"文件未找到: {txt_path}" + + try: + # 读取TXT文件内容 + with open(txt_path, 'r', encoding='utf-8') as f_txt: + content = f_txt.read() + + # 移除Markdown格式,但保留换行符 + # 处理粗体 + content_no_format = re.sub(r'\*\*(.*?)\*\*', r'\1', content) + # 处理斜体 + content_no_format = re.sub(r'\*(.*?)\*', r'\1', content_no_format) + # 处理链接 [文本](链接) + content_no_format = re.sub(r'\[(.*?)\]\(.*?\)', r'\1', content_no_format) + # 处理标题 # 文本 + content_no_format = re.sub(r'^#+ (.*?)$', r'\1', content_no_format, flags=re.MULTILINE) + + return content_no_format, None + except Exception as e: + print(f" - 错误: 处理TXT时出错: {e}") + return None, f"处理TXT时出错: {e}" + def load_topic_data(source_dir, run_id): """ 加载选题数据 @@ -364,14 +462,15 @@ def process_result_directory(source_dir, output_dir, run_id=None, prefer_origina print(f" - 错误: {record['Details']}") continue - # 1. 处理article.json -> txt - json_path = os.path.join(entry_path, "article.json") - txt_path = os.path.join(output_entry_path, "article.txt") - record["ArticleJsonPath"] = json_path - record["OutputTxtPath"] = txt_path + # 1. 处理article.txt + input_txt_path = os.path.join(entry_path, "article.txt") + output_txt_path = os.path.join(output_entry_path, "article.txt") + record["OutputTxtPath"] = output_txt_path + + # 读取article.json,仅用于获取judge_status + json_path = os.path.join(entry_path, "article.json") + record["ArticleJsonPath"] = json_path - # 读取article.json - article_data = {} if os.path.exists(json_path): try: with open(json_path, 'r', encoding='utf-8') as f_json: @@ -382,28 +481,21 @@ def process_result_directory(source_dir, output_dir, run_id=None, prefer_origina elif "judged" in article_data: record["JudgeStatus"] = "已审核" if article_data["judged"] else "未审核" except Exception as e: - print(f" - 错误: 读取article.json失败: {e}") - - txt_content, error = convert_json_to_txt_content(json_path, prefer_original) + print(f" - 警告: 读取article.json失败: {e}") + + # 处理article.txt文件 + if os.path.exists(input_txt_path): + processed_content, error = process_txt_content(input_txt_path) if error: record["Status"] = "Partial" record["Details"] += f"文章处理失败: {error}; " print(f" - 错误: {record['Details']}") else: try: - with open(txt_path, 'w', encoding='utf-8') as f_txt: - f_txt.write(txt_content) - print(f" - 成功写入文本文件: {txt_path}") - - # 记录内容来源 - if prefer_original: - record["ContentSource"] = "original_preferred" - elif article_data.get("judge_success") is True: - record["ContentSource"] = "judged" - elif "original_title" in article_data: - record["ContentSource"] = "original" - else: - record["ContentSource"] = "default" + with open(output_txt_path, 'w', encoding='utf-8') as f_txt: + f_txt.write(processed_content) + print(f" - 成功写入处理后的文本文件: {output_txt_path}") + record["ContentSource"] = "txt_file" except Exception as e: record["Status"] = "Partial" @@ -411,7 +503,7 @@ def process_result_directory(source_dir, output_dir, run_id=None, prefer_origina print(f" - 错误: {record['Details']}") else: record["Status"] = "Partial" - record["Details"] += "文章JSON文件不存在; " + record["Details"] += "文章TXT文件不存在; " print(f" - 警告: {record['Details']}") # 2. 处理海报图片 @@ -535,8 +627,8 @@ def main(): args = parser.parse_args() # 默认值设置 - source = args.source if args.source else "/root/autodl-tmp/TravelContentCreator/result/2025-05-15_20-01-48" - output = args.output if args.output else "/root/autodl-tmp/TravelContentCreator/output/2025-05-15_20-01-48" + source = args.source if args.source else "/root/autodl-tmp/TravelContentCreator/result/2025-05-19_17-51-07" + output = args.output if args.output else "/root/autodl-tmp/TravelContentCreator/output/2025-05-19_17-51-07" run_id = args.run_id if args.run_id else os.path.basename(source) prefer_original = args.prefer_original db_path = args.db_path if args.db_path else '/root/autodl-tmp/TravelContentCreator/distribution.db' diff --git a/scripts/distribution/query_products.py b/scripts/distribution/query_products.py new file mode 100755 index 0000000..cccdd04 --- /dev/null +++ b/scripts/distribution/query_products.py @@ -0,0 +1,224 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import os +import sqlite3 +import csv +import sys +import argparse +from datetime import datetime + +# 数据库路径 +db_path = '/root/autodl-tmp/TravelContentCreator/distribution.db' + +def query_products(product_name=None, object_name=None, output_file=None, show_undistributed_only=False): + """查询产品信息 + + Args: + product_name: 产品名称(可选) + object_name: 景点名称(可选) + output_file: 输出CSV文件路径(可选) + show_undistributed_only: 是否只显示未分发的内容 + + Returns: + 查询结果列表 + """ + # 连接数据库 + try: + conn = sqlite3.connect(db_path) + conn.row_factory = sqlite3.Row # 设置结果为字典格式 + cursor = conn.cursor() + print(f"已连接到数据库: {db_path}") + except sqlite3.Error as e: + print(f"数据库连接错误: {e}") + return [] + + try: + # 构建查询条件 + conditions = [] + params = [] + + if product_name: + conditions.append("product LIKE ?") + params.append(f"%{product_name}%") + + if object_name: + conditions.append("object LIKE ?") + params.append(f"%{object_name}%") + + if show_undistributed_only: + conditions.append("is_distributed = 0") + + # 构建WHERE子句 + where_clause = " AND ".join(conditions) if conditions else "1=1" + + # 执行查询 + query = f""" + SELECT + id, entry_id, product, object, date, logic, judge_status, + output_txt_path, poster_path, article_json_path, created_at, is_distributed + FROM + contents + WHERE + {where_clause} + ORDER BY + product, object, entry_id + """ + + cursor.execute(query, params) + results = [dict(row) for row in cursor.fetchall()] + + # 获取分布统计信息 + statistics = {} + if results: + # 按产品分组统计 + cursor.execute(f""" + SELECT + product, + COUNT(*) as count, + COUNT(CASE WHEN judge_status = 1 THEN 1 END) as approved_count, + COUNT(CASE WHEN is_distributed = 1 THEN 1 END) as distributed_count + FROM + contents + WHERE + product IS NOT NULL AND product != '' AND {where_clause} + GROUP BY + product + ORDER BY + count DESC + """, params) + statistics['products'] = [dict(row) for row in cursor.fetchall()] + + # 按景点分组统计 + cursor.execute(f""" + SELECT + object, + COUNT(*) as count, + COUNT(CASE WHEN judge_status = 1 THEN 1 END) as approved_count, + COUNT(CASE WHEN is_distributed = 1 THEN 1 END) as distributed_count + FROM + contents + WHERE + object IS NOT NULL AND object != '' AND {where_clause} + GROUP BY + object + ORDER BY + count DESC + LIMIT 20 + """, params) + statistics['objects'] = [dict(row) for row in cursor.fetchall()] + + # 输出结果 + if results: + print(f"\n查询到 {len(results)} 条产品记录") + + # 输出前10条记录 + print("\n===== 查询结果 (前10条) =====") + for i, row in enumerate(results[:10], 1): + judge_status = '已通过' if row['judge_status'] == 1 else '未通过' if row['judge_status'] == 0 else '未知' + distributed = '已分发' if row['is_distributed'] == 1 else '未分发' + print(f"{i}. ID: {row['entry_id']}, 产品: {row['product']}, 景点: {row['object']}, 审核: {judge_status}, 分发状态: {distributed}") + + if len(results) > 10: + print(f"... 还有 {len(results) - 10} 条记录未显示") + + # 输出统计信息 + if 'products' in statistics and statistics['products']: + print("\n===== 产品统计 =====") + for prod in statistics['products']: + dist_percent = (prod['distributed_count'] / prod['count'] * 100) if prod['count'] > 0 else 0 + approved_percent = (prod['approved_count'] / prod['count'] * 100) if prod['count'] > 0 else 0 + print(f"产品: {prod['product']}") + print(f" - 内容总数: {prod['count']}") + print(f" - 已审核通过: {prod['approved_count']} ({approved_percent:.1f}%)") + print(f" - 已分发: {prod['distributed_count']} ({dist_percent:.1f}%)") + print(f" - 未分发: {prod['count'] - prod['distributed_count']} ({100 - dist_percent:.1f}%)") + + if 'objects' in statistics and statistics['objects'] and len(statistics['objects']) <= 10: + print("\n===== 景点统计 =====") + for obj in statistics['objects']: + dist_percent = (obj['distributed_count'] / obj['count'] * 100) if obj['count'] > 0 else 0 + approved_percent = (obj['approved_count'] / obj['count'] * 100) if obj['count'] > 0 else 0 + print(f"景点: {obj['object']}") + print(f" - 内容总数: {obj['count']}") + print(f" - 已审核通过: {obj['approved_count']} ({approved_percent:.1f}%)") + print(f" - 已分发: {obj['distributed_count']} ({dist_percent:.1f}%)") + print(f" - 未分发: {obj['count'] - obj['distributed_count']} ({100 - dist_percent:.1f}%)") + + # 未分发内容和已分发内容的汇总 + total_count = len(results) + distributed_count = sum(1 for r in results if r['is_distributed'] == 1) + undistributed_count = total_count - distributed_count + + print("\n===== 分发状态汇总 =====") + print(f"总内容数: {total_count}") + print(f"已分发: {distributed_count} ({distributed_count/total_count*100:.1f}% 如果为0)" if total_count > 0 else "已分发: 0 (0%)") + print(f"未分发: {undistributed_count} ({undistributed_count/total_count*100:.1f}% 如果为100%)" if total_count > 0 else "未分发: 0 (0%)") + + # 如果指定了输出文件,保存结果到CSV + if output_file: + # 确保目录存在 + os.makedirs(os.path.dirname(os.path.abspath(output_file)), exist_ok=True) + + with open(output_file, 'w', newline='', encoding='utf-8-sig') as f: + if results: + # 确定CSV列 + fieldnames = list(results[0].keys()) + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(results) + print(f"\n查询结果已保存到: {output_file}") + else: + print("\n未查询到相关产品记录") + + # 检查数据库是否有任何内容记录 + cursor.execute("SELECT COUNT(*) as count FROM contents") + count = cursor.fetchone()['count'] + + if count == 0: + print("\n提示: 数据库中没有任何内容记录,请先导入数据") + else: + print(f"\n提示: 数据库中有 {count} 条内容记录,但没有符合条件的产品") + + return results + + except Exception as e: + print(f"查询产品时出错: {e}") + import traceback + traceback.print_exc() + return [] + finally: + conn.close() + +def main(): + parser = argparse.ArgumentParser(description="查询数据库中的产品信息") + parser.add_argument("--product", type=str, help="按产品名称查询") + parser.add_argument("--object", type=str, help="按景点名称查询") + parser.add_argument("--output", type=str, help="输出CSV文件路径") + parser.add_argument("--all", action="store_true", help="查询所有产品") + parser.add_argument("--export-csv", action="store_true", help="导出结果到CSV文件") + parser.add_argument("--undistributed", action="store_true", help="只显示未分发的内容") + + args = parser.parse_args() + + # 默认输出文件 + output_file = None + if args.output: + output_file = args.output + elif args.export_csv or args.all: + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + suffix = "_undistributed" if args.undistributed else "" + output_file = f"/root/autodl-tmp/TravelContentCreator/output/product_query{suffix}_{timestamp}.csv" + + # 执行查询 + if args.product or args.object or args.all or args.undistributed: + results = query_products(args.product, args.object, output_file, args.undistributed) + return len(results) > 0 + else: + print("请提供查询条件: --product, --object, --all 或 --undistributed") + parser.print_help() + return False + +if __name__ == "__main__": + success = main() + sys.exit(0 if success else 1) \ No newline at end of file