分发模块第一版脚本
This commit is contained in:
parent
a290901366
commit
3372572da2
287
scripts/extract_and_render.py
Normal file
287
scripts/extract_and_render.py
Normal file
@ -0,0 +1,287 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import os
|
||||
import json
|
||||
import shutil
|
||||
import csv
|
||||
import traceback
|
||||
import re
|
||||
import argparse
|
||||
from datetime import datetime
|
||||
|
||||
def convert_json_to_txt_content(json_path):
|
||||
"""
|
||||
读取 JSON 文件,提取标题、内容和标签,移除 Markdown 格式,
|
||||
并返回格式化文本。
|
||||
"""
|
||||
print(f" - 正在读取 JSON: {json_path}")
|
||||
if not os.path.exists(json_path):
|
||||
print(f" - 警告: JSON 文件不存在: {json_path}")
|
||||
return None, f"文件未找到: {json_path}"
|
||||
|
||||
try:
|
||||
with open(json_path, 'r', encoding='utf-8') as f_json:
|
||||
data = json.load(f_json)
|
||||
|
||||
# 提取字段
|
||||
title = data.get('title', '未找到标题')
|
||||
content = data.get('content', '未找到内容')
|
||||
tags = data.get('tags', data.get('tag', '未找到标签'))
|
||||
|
||||
# 移除Markdown格式
|
||||
content_no_format = re.sub(r'\*\*(.*?)\*\*', r'\1', content)
|
||||
|
||||
# 组合输出文本
|
||||
return f"{title}\n\n{content_no_format}\n\n{tags}", None
|
||||
except json.JSONDecodeError:
|
||||
print(f" - 错误: JSON 格式无效: {json_path}")
|
||||
return None, f"无效的 JSON 格式: {json_path}"
|
||||
except Exception as e:
|
||||
print(f" - 错误: 处理 JSON 时出错: {e}")
|
||||
return None, f"处理 JSON 时出错: {e}"
|
||||
|
||||
def process_result_directory(source_dir, output_dir, run_id=None):
|
||||
"""
|
||||
处理指定的结果目录,提取内容并渲染到输出目录。
|
||||
|
||||
Args:
|
||||
source_dir: 源目录路径,包含i_j子目录
|
||||
output_dir: 输出目录路径
|
||||
run_id: 可选的运行ID,如果不提供则使用源目录名
|
||||
"""
|
||||
if not os.path.isdir(source_dir):
|
||||
print(f"错误: 源目录不存在: {source_dir}")
|
||||
return
|
||||
|
||||
# 创建输出目录
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
print(f"确保输出目录存在: {output_dir}")
|
||||
|
||||
# 提取run_id
|
||||
if not run_id:
|
||||
run_id = os.path.basename(source_dir)
|
||||
|
||||
# 创建CSV清单
|
||||
csv_path = os.path.join(output_dir, f"manifest_{run_id}.csv")
|
||||
csv_data = [
|
||||
[
|
||||
"EntryID",
|
||||
"SourcePath",
|
||||
"ArticleJsonPath",
|
||||
"OutputTxtPath",
|
||||
"PosterPath",
|
||||
"CollagePath",
|
||||
"AdditionalImagesCount",
|
||||
"Status",
|
||||
"Details"
|
||||
]
|
||||
]
|
||||
|
||||
# 查找所有i_j目录
|
||||
entry_pattern = re.compile(r"^\d+_\d+$")
|
||||
entries = []
|
||||
|
||||
for item in os.listdir(source_dir):
|
||||
item_path = os.path.join(source_dir, item)
|
||||
if os.path.isdir(item_path) and entry_pattern.match(item):
|
||||
entries.append(item)
|
||||
|
||||
if not entries:
|
||||
print(f"警告: 在源目录中未找到任何i_j格式的子目录")
|
||||
return
|
||||
|
||||
print(f"找到 {len(entries)} 个条目目录")
|
||||
|
||||
# 处理每个条目
|
||||
for entry in sorted(entries):
|
||||
entry_path = os.path.join(source_dir, entry)
|
||||
output_entry_path = os.path.join(output_dir, entry)
|
||||
|
||||
print(f"\n处理条目: {entry}")
|
||||
|
||||
# 创建记录
|
||||
record = {
|
||||
"EntryID": entry,
|
||||
"SourcePath": entry_path,
|
||||
"ArticleJsonPath": "",
|
||||
"OutputTxtPath": "",
|
||||
"PosterPath": "",
|
||||
"CollagePath": "",
|
||||
"AdditionalImagesCount": 0,
|
||||
"Status": "Processing",
|
||||
"Details": ""
|
||||
}
|
||||
|
||||
# 创建输出条目目录
|
||||
try:
|
||||
os.makedirs(output_entry_path, exist_ok=True)
|
||||
except Exception as e:
|
||||
record["Status"] = "Failed"
|
||||
record["Details"] = f"创建输出目录失败: {e}"
|
||||
csv_data.append([record[col] for col in csv_data[0]])
|
||||
print(f" - 错误: {record['Details']}")
|
||||
continue
|
||||
|
||||
# 1. 处理article.json -> txt
|
||||
json_path = os.path.join(entry_path, "article.json")
|
||||
txt_path = os.path.join(output_entry_path, "article.txt")
|
||||
record["ArticleJsonPath"] = json_path
|
||||
record["OutputTxtPath"] = txt_path
|
||||
|
||||
if os.path.exists(json_path):
|
||||
txt_content, error = convert_json_to_txt_content(json_path)
|
||||
if error:
|
||||
record["Status"] = "Partial"
|
||||
record["Details"] += f"文章处理失败: {error}; "
|
||||
print(f" - 错误: {record['Details']}")
|
||||
else:
|
||||
try:
|
||||
with open(txt_path, 'w', encoding='utf-8') as f_txt:
|
||||
f_txt.write(txt_content)
|
||||
print(f" - 成功写入文本文件: {txt_path}")
|
||||
|
||||
# 同时保存原始JSON
|
||||
json_output_path = os.path.join(output_entry_path, "article.json")
|
||||
shutil.copy2(json_path, json_output_path)
|
||||
print(f" - 复制原始JSON文件: {json_output_path}")
|
||||
except Exception as e:
|
||||
record["Status"] = "Partial"
|
||||
record["Details"] += f"写入文本文件失败: {e}; "
|
||||
print(f" - 错误: {record['Details']}")
|
||||
else:
|
||||
record["Status"] = "Partial"
|
||||
record["Details"] += "文章JSON文件不存在; "
|
||||
print(f" - 警告: {record['Details']}")
|
||||
|
||||
# 2. 处理海报图片
|
||||
poster_dir = os.path.join(entry_path, "poster")
|
||||
poster_jpg_path = os.path.join(poster_dir, "poster.jpg")
|
||||
output_poster_path = os.path.join(output_entry_path, "poster.jpg")
|
||||
record["PosterPath"] = output_poster_path
|
||||
|
||||
if os.path.exists(poster_jpg_path):
|
||||
try:
|
||||
shutil.copy2(poster_jpg_path, output_poster_path)
|
||||
print(f" - 成功复制海报图片: {output_poster_path}")
|
||||
|
||||
# 复制元数据
|
||||
poster_metadata_path = os.path.join(poster_dir, "poster_metadata.json")
|
||||
if os.path.exists(poster_metadata_path):
|
||||
output_poster_metadata = os.path.join(output_entry_path, "poster_metadata.json")
|
||||
shutil.copy2(poster_metadata_path, output_poster_metadata)
|
||||
print(f" - 复制海报元数据: {output_poster_metadata}")
|
||||
except Exception as e:
|
||||
record["Status"] = "Partial"
|
||||
record["Details"] += f"复制海报图片失败: {e}; "
|
||||
print(f" - 错误: {record['Details']}")
|
||||
else:
|
||||
record["Status"] = "Partial"
|
||||
record["Details"] += "海报图片不存在; "
|
||||
print(f" - 警告: {record['Details']}")
|
||||
|
||||
# 3. 处理拼贴图
|
||||
collage_dir = os.path.join(entry_path, "collage_img")
|
||||
collage_path = os.path.join(collage_dir, "collage.png")
|
||||
output_collage_path = os.path.join(output_entry_path, "collage.png")
|
||||
record["CollagePath"] = output_collage_path
|
||||
|
||||
if os.path.exists(collage_path):
|
||||
try:
|
||||
shutil.copy2(collage_path, output_collage_path)
|
||||
print(f" - 成功复制拼贴图: {output_collage_path}")
|
||||
|
||||
# 复制元数据
|
||||
collage_metadata_path = os.path.join(collage_dir, "collage_metadata.json")
|
||||
if os.path.exists(collage_metadata_path):
|
||||
output_collage_metadata = os.path.join(output_entry_path, "collage_metadata.json")
|
||||
shutil.copy2(collage_metadata_path, output_collage_metadata)
|
||||
print(f" - 复制拼贴图元数据: {output_collage_metadata}")
|
||||
except Exception as e:
|
||||
record["Status"] = "Partial"
|
||||
record["Details"] += f"复制拼贴图失败: {e}; "
|
||||
print(f" - 错误: {record['Details']}")
|
||||
else:
|
||||
record["Status"] = "Partial"
|
||||
record["Details"] += "拼贴图不存在; "
|
||||
print(f" - 警告: {record['Details']}")
|
||||
|
||||
# 4. 处理额外图片
|
||||
image_dir = os.path.join(entry_path, "image")
|
||||
output_image_dir = os.path.join(output_entry_path, "additional_images")
|
||||
|
||||
if os.path.exists(image_dir) and os.path.isdir(image_dir):
|
||||
try:
|
||||
os.makedirs(output_image_dir, exist_ok=True)
|
||||
image_count = 0
|
||||
|
||||
for filename in os.listdir(image_dir):
|
||||
if filename.startswith("additional_") and filename.endswith(".jpg"):
|
||||
source_file = os.path.join(image_dir, filename)
|
||||
dest_file = os.path.join(output_image_dir, filename)
|
||||
|
||||
# 复制图片
|
||||
shutil.copy2(source_file, dest_file)
|
||||
image_count += 1
|
||||
|
||||
# 复制相关元数据
|
||||
metadata_filename = filename.replace(".jpg", "_metadata.json")
|
||||
metadata_path = os.path.join(image_dir, metadata_filename)
|
||||
if os.path.exists(metadata_path):
|
||||
dest_metadata = os.path.join(output_image_dir, metadata_filename)
|
||||
shutil.copy2(metadata_path, dest_metadata)
|
||||
|
||||
record["AdditionalImagesCount"] = image_count
|
||||
print(f" - 复制了 {image_count} 张额外图片到: {output_image_dir}")
|
||||
except Exception as e:
|
||||
record["Status"] = "Partial"
|
||||
record["Details"] += f"处理额外图片时出错: {e}; "
|
||||
print(f" - 错误: {record['Details']}")
|
||||
else:
|
||||
record["AdditionalImagesCount"] = 0
|
||||
print(f" - 没有找到额外图片目录")
|
||||
|
||||
# 更新状态
|
||||
if record["Status"] == "Processing":
|
||||
record["Status"] = "Success"
|
||||
record["Details"] = "处理成功完成"
|
||||
|
||||
# 添加记录到CSV数据
|
||||
csv_data.append([record[col] for col in csv_data[0]])
|
||||
|
||||
# 写入CSV清单
|
||||
try:
|
||||
print(f"\n正在写入清单CSV: {csv_path}")
|
||||
with open(csv_path, 'w', newline='', encoding='utf-8-sig') as f_csv:
|
||||
writer = csv.writer(f_csv)
|
||||
writer.writerows(csv_data)
|
||||
print(f"清单CSV生成成功")
|
||||
except Exception as e:
|
||||
print(f"写入CSV文件时出错: {e}")
|
||||
traceback.print_exc()
|
||||
|
||||
print(f"\n处理完成. 共处理 {len(entries)} 个条目.")
|
||||
print(f"结果保存在: {output_dir}")
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="从TravelContentCreator结果目录提取内容并渲染到指定目录")
|
||||
parser.add_argument("--source", type=str, required=True, help="源目录路径")
|
||||
parser.add_argument("--output", type=str, required=True, help="输出目录路径")
|
||||
parser.add_argument("--run-id", type=str, help="自定义运行ID")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
print("-" * 60)
|
||||
print(f"开始提取和渲染流程")
|
||||
print(f"源目录: {args.source}")
|
||||
print(f"输出目录: {args.output}")
|
||||
if args.run_id:
|
||||
print(f"运行ID: {args.run_id}")
|
||||
print("-" * 60)
|
||||
|
||||
process_result_directory(args.source, args.output, args.run_id)
|
||||
|
||||
print("\n脚本执行完毕.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
x
Reference in New Issue
Block a user