From 3372572da27ea094edaf0ceb87c54cebda4b8892 Mon Sep 17 00:00:00 2001 From: jinye_huang Date: Sat, 10 May 2025 21:35:38 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=86=E5=8F=91=E6=A8=A1=E5=9D=97=E7=AC=AC?= =?UTF-8?q?=E4=B8=80=E7=89=88=E8=84=9A=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scripts/extract_and_render.py | 287 ++++++++++++++++++++++++++++++++++ 1 file changed, 287 insertions(+) create mode 100644 scripts/extract_and_render.py diff --git a/scripts/extract_and_render.py b/scripts/extract_and_render.py new file mode 100644 index 0000000..faa6a33 --- /dev/null +++ b/scripts/extract_and_render.py @@ -0,0 +1,287 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import os +import json +import shutil +import csv +import traceback +import re +import argparse +from datetime import datetime + +def convert_json_to_txt_content(json_path): + """ + 读取 JSON 文件,提取标题、内容和标签,移除 Markdown 格式, + 并返回格式化文本。 + """ + print(f" - 正在读取 JSON: {json_path}") + if not os.path.exists(json_path): + print(f" - 警告: JSON 文件不存在: {json_path}") + return None, f"文件未找到: {json_path}" + + try: + with open(json_path, 'r', encoding='utf-8') as f_json: + data = json.load(f_json) + + # 提取字段 + title = data.get('title', '未找到标题') + content = data.get('content', '未找到内容') + tags = data.get('tags', data.get('tag', '未找到标签')) + + # 移除Markdown格式 + content_no_format = re.sub(r'\*\*(.*?)\*\*', r'\1', content) + + # 组合输出文本 + return f"{title}\n\n{content_no_format}\n\n{tags}", None + except json.JSONDecodeError: + print(f" - 错误: JSON 格式无效: {json_path}") + return None, f"无效的 JSON 格式: {json_path}" + except Exception as e: + print(f" - 错误: 处理 JSON 时出错: {e}") + return None, f"处理 JSON 时出错: {e}" + +def process_result_directory(source_dir, output_dir, run_id=None): + """ + 处理指定的结果目录,提取内容并渲染到输出目录。 + + Args: + source_dir: 源目录路径,包含i_j子目录 + output_dir: 输出目录路径 + run_id: 可选的运行ID,如果不提供则使用源目录名 + """ + if not os.path.isdir(source_dir): + print(f"错误: 源目录不存在: {source_dir}") + return + + # 创建输出目录 + os.makedirs(output_dir, exist_ok=True) + print(f"确保输出目录存在: {output_dir}") + + # 提取run_id + if not run_id: + run_id = os.path.basename(source_dir) + + # 创建CSV清单 + csv_path = os.path.join(output_dir, f"manifest_{run_id}.csv") + csv_data = [ + [ + "EntryID", + "SourcePath", + "ArticleJsonPath", + "OutputTxtPath", + "PosterPath", + "CollagePath", + "AdditionalImagesCount", + "Status", + "Details" + ] + ] + + # 查找所有i_j目录 + entry_pattern = re.compile(r"^\d+_\d+$") + entries = [] + + for item in os.listdir(source_dir): + item_path = os.path.join(source_dir, item) + if os.path.isdir(item_path) and entry_pattern.match(item): + entries.append(item) + + if not entries: + print(f"警告: 在源目录中未找到任何i_j格式的子目录") + return + + print(f"找到 {len(entries)} 个条目目录") + + # 处理每个条目 + for entry in sorted(entries): + entry_path = os.path.join(source_dir, entry) + output_entry_path = os.path.join(output_dir, entry) + + print(f"\n处理条目: {entry}") + + # 创建记录 + record = { + "EntryID": entry, + "SourcePath": entry_path, + "ArticleJsonPath": "", + "OutputTxtPath": "", + "PosterPath": "", + "CollagePath": "", + "AdditionalImagesCount": 0, + "Status": "Processing", + "Details": "" + } + + # 创建输出条目目录 + try: + os.makedirs(output_entry_path, exist_ok=True) + except Exception as e: + record["Status"] = "Failed" + record["Details"] = f"创建输出目录失败: {e}" + csv_data.append([record[col] for col in csv_data[0]]) + print(f" - 错误: {record['Details']}") + continue + + # 1. 处理article.json -> txt + json_path = os.path.join(entry_path, "article.json") + txt_path = os.path.join(output_entry_path, "article.txt") + record["ArticleJsonPath"] = json_path + record["OutputTxtPath"] = txt_path + + if os.path.exists(json_path): + txt_content, error = convert_json_to_txt_content(json_path) + if error: + record["Status"] = "Partial" + record["Details"] += f"文章处理失败: {error}; " + print(f" - 错误: {record['Details']}") + else: + try: + with open(txt_path, 'w', encoding='utf-8') as f_txt: + f_txt.write(txt_content) + print(f" - 成功写入文本文件: {txt_path}") + + # 同时保存原始JSON + json_output_path = os.path.join(output_entry_path, "article.json") + shutil.copy2(json_path, json_output_path) + print(f" - 复制原始JSON文件: {json_output_path}") + except Exception as e: + record["Status"] = "Partial" + record["Details"] += f"写入文本文件失败: {e}; " + print(f" - 错误: {record['Details']}") + else: + record["Status"] = "Partial" + record["Details"] += "文章JSON文件不存在; " + print(f" - 警告: {record['Details']}") + + # 2. 处理海报图片 + poster_dir = os.path.join(entry_path, "poster") + poster_jpg_path = os.path.join(poster_dir, "poster.jpg") + output_poster_path = os.path.join(output_entry_path, "poster.jpg") + record["PosterPath"] = output_poster_path + + if os.path.exists(poster_jpg_path): + try: + shutil.copy2(poster_jpg_path, output_poster_path) + print(f" - 成功复制海报图片: {output_poster_path}") + + # 复制元数据 + poster_metadata_path = os.path.join(poster_dir, "poster_metadata.json") + if os.path.exists(poster_metadata_path): + output_poster_metadata = os.path.join(output_entry_path, "poster_metadata.json") + shutil.copy2(poster_metadata_path, output_poster_metadata) + print(f" - 复制海报元数据: {output_poster_metadata}") + except Exception as e: + record["Status"] = "Partial" + record["Details"] += f"复制海报图片失败: {e}; " + print(f" - 错误: {record['Details']}") + else: + record["Status"] = "Partial" + record["Details"] += "海报图片不存在; " + print(f" - 警告: {record['Details']}") + + # 3. 处理拼贴图 + collage_dir = os.path.join(entry_path, "collage_img") + collage_path = os.path.join(collage_dir, "collage.png") + output_collage_path = os.path.join(output_entry_path, "collage.png") + record["CollagePath"] = output_collage_path + + if os.path.exists(collage_path): + try: + shutil.copy2(collage_path, output_collage_path) + print(f" - 成功复制拼贴图: {output_collage_path}") + + # 复制元数据 + collage_metadata_path = os.path.join(collage_dir, "collage_metadata.json") + if os.path.exists(collage_metadata_path): + output_collage_metadata = os.path.join(output_entry_path, "collage_metadata.json") + shutil.copy2(collage_metadata_path, output_collage_metadata) + print(f" - 复制拼贴图元数据: {output_collage_metadata}") + except Exception as e: + record["Status"] = "Partial" + record["Details"] += f"复制拼贴图失败: {e}; " + print(f" - 错误: {record['Details']}") + else: + record["Status"] = "Partial" + record["Details"] += "拼贴图不存在; " + print(f" - 警告: {record['Details']}") + + # 4. 处理额外图片 + image_dir = os.path.join(entry_path, "image") + output_image_dir = os.path.join(output_entry_path, "additional_images") + + if os.path.exists(image_dir) and os.path.isdir(image_dir): + try: + os.makedirs(output_image_dir, exist_ok=True) + image_count = 0 + + for filename in os.listdir(image_dir): + if filename.startswith("additional_") and filename.endswith(".jpg"): + source_file = os.path.join(image_dir, filename) + dest_file = os.path.join(output_image_dir, filename) + + # 复制图片 + shutil.copy2(source_file, dest_file) + image_count += 1 + + # 复制相关元数据 + metadata_filename = filename.replace(".jpg", "_metadata.json") + metadata_path = os.path.join(image_dir, metadata_filename) + if os.path.exists(metadata_path): + dest_metadata = os.path.join(output_image_dir, metadata_filename) + shutil.copy2(metadata_path, dest_metadata) + + record["AdditionalImagesCount"] = image_count + print(f" - 复制了 {image_count} 张额外图片到: {output_image_dir}") + except Exception as e: + record["Status"] = "Partial" + record["Details"] += f"处理额外图片时出错: {e}; " + print(f" - 错误: {record['Details']}") + else: + record["AdditionalImagesCount"] = 0 + print(f" - 没有找到额外图片目录") + + # 更新状态 + if record["Status"] == "Processing": + record["Status"] = "Success" + record["Details"] = "处理成功完成" + + # 添加记录到CSV数据 + csv_data.append([record[col] for col in csv_data[0]]) + + # 写入CSV清单 + try: + print(f"\n正在写入清单CSV: {csv_path}") + with open(csv_path, 'w', newline='', encoding='utf-8-sig') as f_csv: + writer = csv.writer(f_csv) + writer.writerows(csv_data) + print(f"清单CSV生成成功") + except Exception as e: + print(f"写入CSV文件时出错: {e}") + traceback.print_exc() + + print(f"\n处理完成. 共处理 {len(entries)} 个条目.") + print(f"结果保存在: {output_dir}") + +def main(): + parser = argparse.ArgumentParser(description="从TravelContentCreator结果目录提取内容并渲染到指定目录") + parser.add_argument("--source", type=str, required=True, help="源目录路径") + parser.add_argument("--output", type=str, required=True, help="输出目录路径") + parser.add_argument("--run-id", type=str, help="自定义运行ID") + + args = parser.parse_args() + + print("-" * 60) + print(f"开始提取和渲染流程") + print(f"源目录: {args.source}") + print(f"输出目录: {args.output}") + if args.run_id: + print(f"运行ID: {args.run_id}") + print("-" * 60) + + process_result_directory(args.source, args.output, args.run_id) + + print("\n脚本执行完毕.") + +if __name__ == "__main__": + main() \ No newline at end of file