700 lines
26 KiB
Python
700 lines
26 KiB
Python
#!/usr/bin/env python
|
||
# -*- coding: utf-8 -*-
|
||
|
||
import os
|
||
import json
|
||
import shutil
|
||
import csv
|
||
import traceback
|
||
import re
|
||
import argparse
|
||
from datetime import datetime
|
||
import sqlite3
|
||
import logging
|
||
import base64
|
||
|
||
# 配置日志
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||
handlers=[logging.StreamHandler()]
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# 内置数据库记录功能
|
||
def init_database(db_path):
|
||
"""初始化数据库,创建表结构"""
|
||
try:
|
||
conn = sqlite3.connect(db_path)
|
||
conn.execute("PRAGMA foreign_keys = OFF") # 禁用外键约束
|
||
cursor = conn.cursor()
|
||
|
||
# 创建内容表
|
||
cursor.execute("""
|
||
CREATE TABLE IF NOT EXISTS contents (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
entry_id TEXT NOT NULL UNIQUE,
|
||
output_txt_path TEXT,
|
||
poster_path TEXT,
|
||
article_json_path TEXT,
|
||
product TEXT,
|
||
object TEXT,
|
||
date TEXT,
|
||
logic TEXT,
|
||
judge_status INTEGER,
|
||
is_distributed INTEGER DEFAULT 0,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||
)
|
||
""")
|
||
|
||
# 创建索引
|
||
cursor.execute("CREATE INDEX IF NOT EXISTS idx_contents_entry_id ON contents(entry_id)")
|
||
|
||
conn.commit()
|
||
logger.info("数据库初始化成功")
|
||
return conn
|
||
except sqlite3.Error as e:
|
||
logger.error(f"初始化数据库失败: {e}")
|
||
return None
|
||
|
||
def record_to_database(
|
||
db_path,
|
||
entry_id,
|
||
output_txt_path=None,
|
||
poster_path=None,
|
||
article_json_path=None,
|
||
product=None,
|
||
object=None,
|
||
date=None,
|
||
logic=None,
|
||
judge_status=None,
|
||
is_distributed=0
|
||
):
|
||
"""将内容记录到数据库"""
|
||
try:
|
||
# 检查数据库是否存在,如果不存在则初始化
|
||
if not os.path.exists(db_path):
|
||
logger.info(f"数据库文件不存在: {db_path},将自动创建")
|
||
conn = init_database(db_path)
|
||
if not conn:
|
||
return False
|
||
else:
|
||
try:
|
||
conn = sqlite3.connect(db_path)
|
||
conn.execute("PRAGMA foreign_keys = OFF") # 禁用外键约束
|
||
except sqlite3.Error as e:
|
||
logger.error(f"连接数据库失败: {e}")
|
||
return False
|
||
|
||
try:
|
||
cursor = conn.cursor()
|
||
|
||
# 准备数据
|
||
data = (
|
||
entry_id,
|
||
output_txt_path or '',
|
||
poster_path or '',
|
||
article_json_path or '',
|
||
product or '',
|
||
object or '',
|
||
date or '',
|
||
logic or '',
|
||
judge_status if judge_status is not None else None,
|
||
is_distributed
|
||
)
|
||
|
||
# 插入或更新内容
|
||
cursor.execute("""
|
||
INSERT OR REPLACE INTO contents
|
||
(entry_id, output_txt_path, poster_path, article_json_path,
|
||
product, object, date, logic, judge_status, is_distributed)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""", data)
|
||
|
||
conn.commit()
|
||
logger.info(f"已将内容 {entry_id} 记录到数据库")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"记录内容到数据库失败: {e}")
|
||
try:
|
||
conn.rollback()
|
||
except:
|
||
pass
|
||
return False
|
||
finally:
|
||
try:
|
||
conn.close()
|
||
except:
|
||
pass
|
||
|
||
except Exception as e:
|
||
logger.error(f"记录提取内容时发生错误: {e}")
|
||
return False
|
||
|
||
def convert_json_to_txt_content(json_path, prefer_original=False):
|
||
"""
|
||
读取 JSON 文件,提取标题、内容和标签,移除 Markdown 格式,
|
||
并返回格式化文本。
|
||
|
||
根据JSON文件中的状态字段决定使用什么内容:
|
||
- 如果judged=True,使用审核后内容
|
||
- 如果judged=False,使用原始内容
|
||
|
||
所有情况都优先使用base64编码的字段,因为这些字段能正确保留特殊字符和换行符
|
||
|
||
Args:
|
||
json_path: JSON文件路径
|
||
prefer_original: 参数保留但不再使用
|
||
"""
|
||
print(f" - 正在读取 JSON: {json_path}")
|
||
if not os.path.exists(json_path):
|
||
print(f" - 警告: JSON 文件不存在: {json_path}")
|
||
return None, f"文件未找到: {json_path}"
|
||
|
||
try:
|
||
with open(json_path, 'r', encoding='utf-8') as f_json:
|
||
data = json.load(f_json)
|
||
|
||
# 提取状态字段
|
||
judged = data.get('judged', False)
|
||
|
||
print(f" - 文件状态: judged={judged}")
|
||
|
||
# 初始化变量
|
||
title = None
|
||
content = None
|
||
original_title = None
|
||
original_content = None
|
||
tags = None
|
||
original_tags = None
|
||
|
||
# =================解码所有可能的base64字段=================
|
||
# 解码标题和内容字段
|
||
if "title_base64" in data:
|
||
try:
|
||
title = base64.b64decode(data["title_base64"]).decode('utf-8')
|
||
print(f" - 成功从base64解码审核后标题")
|
||
except Exception as e:
|
||
print(f" - 警告: 标题base64解码失败: {e}")
|
||
|
||
if "content_base64" in data:
|
||
try:
|
||
content = base64.b64decode(data["content_base64"]).decode('utf-8')
|
||
print(f" - 成功从base64解码审核后内容")
|
||
except Exception as e:
|
||
print(f" - 警告: 内容base64解码失败: {e}")
|
||
|
||
# 解码原始标题和内容字段
|
||
if "original_title_base64" in data:
|
||
try:
|
||
original_title = base64.b64decode(data["original_title_base64"]).decode('utf-8')
|
||
print(f" - 成功从base64解码原始标题")
|
||
except Exception as e:
|
||
print(f" - 警告: 原始标题base64解码失败: {e}")
|
||
|
||
if "original_content_base64" in data:
|
||
try:
|
||
original_content = base64.b64decode(data["original_content_base64"]).decode('utf-8')
|
||
print(f" - 成功从base64解码原始内容")
|
||
except Exception as e:
|
||
print(f" - 警告: 原始内容base64解码失败: {e}")
|
||
|
||
# 解码标签字段
|
||
if "tags_base64" in data:
|
||
try:
|
||
tags = base64.b64decode(data["tags_base64"]).decode('utf-8')
|
||
print(f" - 成功从base64解码审核后标签")
|
||
except Exception as e:
|
||
print(f" - 警告: 标签base64解码失败: {e}")
|
||
|
||
if "original_tags_base64" in data:
|
||
try:
|
||
original_tags = base64.b64decode(data["original_tags_base64"]).decode('utf-8')
|
||
print(f" - 成功从base64解码原始标签")
|
||
except Exception as e:
|
||
print(f" - 警告: 原始标签base64解码失败: {e}")
|
||
|
||
# =================回退到非base64字段=================
|
||
# 如果base64解码失败,尝试使用普通字段
|
||
if title is None and "title" in data:
|
||
title = data["title"]
|
||
print(f" - 使用普通字段标题")
|
||
|
||
if content is None and "content" in data:
|
||
content = data["content"]
|
||
print(f" - 使用普通字段内容")
|
||
|
||
if original_title is None and "original_title" in data:
|
||
original_title = data["original_title"]
|
||
print(f" - 使用普通字段原始标题")
|
||
|
||
if original_content is None and "original_content" in data:
|
||
original_content = data["original_content"]
|
||
print(f" - 使用普通字段原始内容")
|
||
|
||
if tags is None and "tags" in data:
|
||
tags = data["tags"]
|
||
print(f" - 使用普通字段标签")
|
||
elif tags is None and "tag" in data:
|
||
tags = data["tag"]
|
||
print(f" - 使用普通tag字段作为标签")
|
||
|
||
if original_tags is None and "original_tags" in data:
|
||
original_tags = data["original_tags"]
|
||
print(f" - 使用普通字段原始标签")
|
||
|
||
# =================根据状态字段决定使用哪些内容=================
|
||
final_title = None
|
||
final_content = None
|
||
final_tags = None
|
||
|
||
# 简化逻辑:如果已审核,使用审核后内容;否则使用原始内容
|
||
if judged:
|
||
print(f" - 使用审核后内容 (judged=True)")
|
||
final_title = title
|
||
final_content = content
|
||
final_tags = tags
|
||
else:
|
||
print(f" - 使用原始内容 (judged=False)")
|
||
final_title = original_title
|
||
final_content = original_content
|
||
final_tags = original_tags
|
||
|
||
# 确保所有字段都有值
|
||
final_title = final_title or "未找到标题"
|
||
final_content = final_content or "未找到内容"
|
||
final_tags = final_tags or "未找到标签"
|
||
|
||
# 移除Markdown格式,但保留换行符
|
||
content_no_format = re.sub(r'\*\*(.*?)\*\*', r'\1', final_content)
|
||
|
||
# 组合输出文本,保留内容的所有换行符
|
||
result = final_title + "\n\n" + content_no_format
|
||
|
||
if final_tags and final_tags != "未找到标签":
|
||
result += "\n\n" + final_tags
|
||
|
||
print(f" - 内容处理完成,最终文本长度: {len(result)} 字符")
|
||
return result, None
|
||
|
||
except json.JSONDecodeError:
|
||
print(f" - 错误: JSON 格式无效: {json_path}")
|
||
return None, f"无效的 JSON 格式: {json_path}"
|
||
except Exception as e:
|
||
print(f" - 错误: 处理 JSON 时出错: {e}")
|
||
traceback.print_exc()
|
||
return None, f"处理 JSON 时出错: {e}"
|
||
|
||
def process_txt_content(txt_path):
|
||
"""
|
||
直接读取TXT文件内容,移除Markdown格式,并返回处理后的文本。
|
||
|
||
Args:
|
||
txt_path: TXT文件路径
|
||
|
||
Returns:
|
||
tuple: (处理后的内容,错误信息)
|
||
"""
|
||
print(f" - 正在读取TXT: {txt_path}")
|
||
if not os.path.exists(txt_path):
|
||
print(f" - 警告: TXT文件不存在: {txt_path}")
|
||
return None, f"文件未找到: {txt_path}"
|
||
|
||
try:
|
||
# 读取TXT文件内容
|
||
with open(txt_path, 'r', encoding='utf-8') as f_txt:
|
||
content = f_txt.read()
|
||
|
||
# 移除Markdown格式,但保留换行符
|
||
# 处理粗体
|
||
content_no_format = re.sub(r'\*\*(.*?)\*\*', r'\1', content)
|
||
# 处理斜体
|
||
content_no_format = re.sub(r'\*(.*?)\*', r'\1', content_no_format)
|
||
# 处理链接 [文本](链接)
|
||
content_no_format = re.sub(r'\[(.*?)\]\(.*?\)', r'\1', content_no_format)
|
||
# 处理标题 # 文本
|
||
content_no_format = re.sub(r'^#+ (.*?)$', r'\1', content_no_format, flags=re.MULTILINE)
|
||
|
||
return content_no_format, None
|
||
except Exception as e:
|
||
print(f" - 错误: 处理TXT时出错: {e}")
|
||
return None, f"处理TXT时出错: {e}"
|
||
|
||
def load_topic_data(source_dir, run_id):
|
||
"""
|
||
加载选题数据
|
||
|
||
Args:
|
||
source_dir: 源目录路径
|
||
run_id: 运行ID
|
||
|
||
Returns:
|
||
dict: 以topic_index为键的选题数据字典
|
||
"""
|
||
topic_file_path = os.path.join(source_dir, f"tweet_topic_{run_id}.json")
|
||
topic_data = {}
|
||
|
||
if os.path.exists(topic_file_path):
|
||
try:
|
||
with open(topic_file_path, 'r', encoding='utf-8') as f:
|
||
topics = json.load(f)
|
||
|
||
# 将选题数据转换为以index为键的字典
|
||
for topic in topics:
|
||
index = topic.get("index")
|
||
if index:
|
||
topic_data[index] = topic
|
||
|
||
print(f"成功加载选题数据,共{len(topic_data)}条")
|
||
except Exception as e:
|
||
print(f"加载选题数据时出错: {e}")
|
||
else:
|
||
print(f"警告: 未找到选题文件: {topic_file_path}")
|
||
|
||
return topic_data
|
||
|
||
def process_result_directory(source_dir, output_dir, run_id=None, prefer_original=False, db_path=None):
|
||
"""
|
||
处理指定的结果目录,提取内容并渲染到输出目录。
|
||
|
||
Args:
|
||
source_dir: 源目录路径,包含i_j子目录
|
||
output_dir: 输出目录路径
|
||
run_id: 可选的运行ID,如果不提供则使用源目录名
|
||
prefer_original: 是否优先使用原始内容,无视judge_success结果
|
||
db_path: 数据库路径,若不提供则使用默认路径
|
||
"""
|
||
if not os.path.isdir(source_dir):
|
||
print(f"错误: 源目录不存在: {source_dir}")
|
||
return
|
||
|
||
# 设置默认数据库路径
|
||
if db_path is None:
|
||
db_path = '/root/autodl-tmp/TravelContentCreator/distribution.db'
|
||
|
||
# 数据库是否启用
|
||
db_enabled = True
|
||
|
||
# 创建输出目录
|
||
os.makedirs(output_dir, exist_ok=True)
|
||
print(f"确保输出目录存在: {output_dir}")
|
||
|
||
# 提取run_id
|
||
if not run_id:
|
||
run_id = os.path.basename(source_dir)
|
||
|
||
# 加载选题数据
|
||
topic_data = load_topic_data(source_dir, run_id)
|
||
|
||
# 创建CSV清单,添加选题相关字段
|
||
csv_path = os.path.join(output_dir, f"manifest_{run_id}.csv")
|
||
csv_data = [
|
||
[
|
||
"EntryID",
|
||
"TopicIndex",
|
||
"VariantIndex",
|
||
"Date",
|
||
"Logic",
|
||
"Object",
|
||
"Product",
|
||
"ProductLogic",
|
||
"Style",
|
||
"StyleLogic",
|
||
"TargetAudience",
|
||
"TargetAudienceLogic",
|
||
"SourcePath",
|
||
"ArticleJsonPath",
|
||
"OutputTxtPath",
|
||
"PosterPath",
|
||
"AdditionalImagesCount",
|
||
"Status",
|
||
"Details",
|
||
"JudgeStatus",
|
||
"ContentSource",
|
||
"RecordedInDB",
|
||
"IsDistributed"
|
||
]
|
||
]
|
||
|
||
# 查找所有i_j目录
|
||
entry_pattern = re.compile(r"^(\d+)_(\d+)$")
|
||
entries = []
|
||
|
||
for item in os.listdir(source_dir):
|
||
item_path = os.path.join(source_dir, item)
|
||
match = entry_pattern.match(item)
|
||
if os.path.isdir(item_path) and match:
|
||
entries.append(item)
|
||
|
||
if not entries:
|
||
print(f"警告: 在源目录中未找到任何i_j格式的子目录")
|
||
return
|
||
|
||
print(f"找到 {len(entries)} 个条目目录")
|
||
|
||
# 处理每个条目
|
||
for entry in sorted(entries):
|
||
entry_path = os.path.join(source_dir, entry)
|
||
output_entry_path = os.path.join(output_dir, entry)
|
||
|
||
print(f"\n处理条目: {entry}")
|
||
|
||
# 解析topic_index和variant_index
|
||
match = entry_pattern.match(entry)
|
||
topic_index = match.group(1)
|
||
variant_index = match.group(2)
|
||
|
||
# 获取该话题的选题信息
|
||
topic_info = topic_data.get(topic_index, {})
|
||
|
||
# 创建记录
|
||
record = {
|
||
"EntryID": entry,
|
||
"TopicIndex": topic_index,
|
||
"VariantIndex": variant_index,
|
||
"Date": topic_info.get("date", ""),
|
||
"Logic": topic_info.get("logic", ""),
|
||
"Object": topic_info.get("object", ""),
|
||
"Product": topic_info.get("product", ""),
|
||
"ProductLogic": topic_info.get("product_logic", ""),
|
||
"Style": topic_info.get("style", ""),
|
||
"StyleLogic": topic_info.get("style_logic", ""),
|
||
"TargetAudience": topic_info.get("target_audience", ""),
|
||
"TargetAudienceLogic": topic_info.get("target_audience_logic", ""),
|
||
"SourcePath": entry_path,
|
||
"ArticleJsonPath": "",
|
||
"OutputTxtPath": "",
|
||
"PosterPath": "",
|
||
"AdditionalImagesCount": 0,
|
||
"Status": "Processing",
|
||
"Details": "",
|
||
"JudgeStatus": "",
|
||
"ContentSource": "unknown",
|
||
"RecordedInDB": "No",
|
||
"IsDistributed": "No"
|
||
}
|
||
|
||
# 创建输出条目目录
|
||
try:
|
||
os.makedirs(output_entry_path, exist_ok=True)
|
||
except Exception as e:
|
||
record["Status"] = "Failed"
|
||
record["Details"] = f"创建输出目录失败: {e}"
|
||
csv_data.append([record[col] for col in csv_data[0]])
|
||
print(f" - 错误: {record['Details']}")
|
||
continue
|
||
|
||
# 1. 处理article内容 - 优先使用JSON文件
|
||
output_txt_path = os.path.join(output_entry_path, "article.txt")
|
||
record["OutputTxtPath"] = output_txt_path
|
||
|
||
# 读取article.json
|
||
json_path = os.path.join(entry_path, "article.json")
|
||
record["ArticleJsonPath"] = json_path
|
||
|
||
content_processed = False
|
||
|
||
# 优先从JSON提取内容
|
||
if os.path.exists(json_path):
|
||
try:
|
||
# 从JSON文件提取审核状态
|
||
with open(json_path, 'r', encoding='utf-8') as f_json:
|
||
article_data = json.load(f_json)
|
||
# 提取judge_success状态
|
||
if "judge_success" in article_data:
|
||
record["JudgeStatus"] = str(article_data["judge_success"])
|
||
elif "judged" in article_data:
|
||
record["JudgeStatus"] = "已审核" if article_data["judged"] else "未审核"
|
||
|
||
# 使用convert_json_to_txt_content函数处理JSON文件
|
||
processed_content, error = convert_json_to_txt_content(json_path, prefer_original)
|
||
|
||
if error:
|
||
print(f" - 警告: 从JSON提取内容失败: {error}")
|
||
else:
|
||
try:
|
||
with open(output_txt_path, 'w', encoding='utf-8') as f_txt:
|
||
f_txt.write(processed_content)
|
||
print(f" - 成功从JSON提取并写入内容到: {output_txt_path}")
|
||
record["ContentSource"] = "json_file"
|
||
content_processed = True
|
||
except Exception as e:
|
||
print(f" - 警告: 写入从JSON提取的内容时出错: {e}")
|
||
except Exception as e:
|
||
print(f" - 警告: 处理JSON文件时出错: {e}")
|
||
|
||
# 如果从JSON提取内容失败,尝试使用现有的TXT文件
|
||
if not content_processed:
|
||
input_txt_path = os.path.join(entry_path, "article.txt")
|
||
if os.path.exists(input_txt_path):
|
||
processed_content, error = process_txt_content(input_txt_path)
|
||
if error:
|
||
record["Status"] = "Partial"
|
||
record["Details"] += f"文章处理失败: {error}; "
|
||
print(f" - 错误: {record['Details']}")
|
||
else:
|
||
try:
|
||
with open(output_txt_path, 'w', encoding='utf-8') as f_txt:
|
||
f_txt.write(processed_content)
|
||
print(f" - 成功写入处理后的文本文件: {output_txt_path}")
|
||
record["ContentSource"] = "txt_file"
|
||
content_processed = True
|
||
except Exception as e:
|
||
record["Status"] = "Partial"
|
||
record["Details"] += f"写入文本文件失败: {e}; "
|
||
print(f" - 错误: {record['Details']}")
|
||
else:
|
||
record["Status"] = "Partial"
|
||
record["Details"] += "无法从JSON或TXT获取内容; "
|
||
print(f" - 警告: {record['Details']}")
|
||
|
||
# 2. 处理海报图片
|
||
poster_dir = os.path.join(entry_path, "poster")
|
||
poster_jpg_path = os.path.join(poster_dir, "poster.jpg")
|
||
output_poster_path = os.path.join(output_entry_path, "poster.jpg")
|
||
record["PosterPath"] = output_poster_path
|
||
|
||
if os.path.exists(poster_jpg_path):
|
||
try:
|
||
shutil.copy2(poster_jpg_path, output_poster_path)
|
||
print(f" - 成功复制海报图片: {output_poster_path}")
|
||
except Exception as e:
|
||
record["Status"] = "Partial"
|
||
record["Details"] += f"复制海报图片失败: {e}; "
|
||
print(f" - 错误: {record['Details']}")
|
||
else:
|
||
record["Status"] = "Partial"
|
||
record["Details"] += "海报图片不存在; "
|
||
print(f" - 警告: {record['Details']}")
|
||
|
||
# 3. 处理额外图片
|
||
image_dir = os.path.join(entry_path, "image")
|
||
output_image_dir = os.path.join(output_entry_path, "additional_images")
|
||
|
||
if os.path.exists(image_dir) and os.path.isdir(image_dir):
|
||
try:
|
||
os.makedirs(output_image_dir, exist_ok=True)
|
||
image_count = 0
|
||
|
||
for filename in os.listdir(image_dir):
|
||
if filename.startswith("additional_") and filename.endswith(".jpg"):
|
||
source_file = os.path.join(image_dir, filename)
|
||
dest_file = os.path.join(output_image_dir, filename)
|
||
|
||
# 复制图片
|
||
shutil.copy2(source_file, dest_file)
|
||
image_count += 1
|
||
|
||
record["AdditionalImagesCount"] = image_count
|
||
print(f" - 复制了 {image_count} 张额外图片到: {output_image_dir}")
|
||
except Exception as e:
|
||
record["Status"] = "Partial"
|
||
record["Details"] += f"处理额外图片时出错: {e}; "
|
||
print(f" - 错误: {record['Details']}")
|
||
else:
|
||
record["AdditionalImagesCount"] = 0
|
||
print(f" - 没有找到额外图片目录")
|
||
|
||
# 更新状态
|
||
if record["Status"] == "Processing":
|
||
record["Status"] = "Success"
|
||
record["Details"] = "处理成功完成"
|
||
|
||
# 4. 将内容记录到数据库
|
||
if db_enabled:
|
||
try:
|
||
# 准备judge_status值
|
||
if record["JudgeStatus"] == "True":
|
||
judge_status = 1
|
||
elif record["JudgeStatus"] == "False":
|
||
judge_status = 0
|
||
else:
|
||
judge_status = None
|
||
|
||
# 调用数据库记录函数
|
||
success = record_to_database(
|
||
db_path,
|
||
entry_id=record["EntryID"],
|
||
output_txt_path=record["OutputTxtPath"],
|
||
poster_path=record["PosterPath"],
|
||
article_json_path=record["ArticleJsonPath"],
|
||
product=record["Product"],
|
||
object=record["Object"],
|
||
date=record["Date"],
|
||
logic=record["Logic"],
|
||
judge_status=judge_status,
|
||
is_distributed=0 # 默认为未分发
|
||
)
|
||
|
||
if success:
|
||
record["RecordedInDB"] = "Yes"
|
||
print(f" - 成功将内容记录到数据库")
|
||
else:
|
||
record["RecordedInDB"] = "Failed"
|
||
print(f" - 警告: 内容记录到数据库失败")
|
||
except Exception as e:
|
||
record["RecordedInDB"] = "Error"
|
||
print(f" - 错误: 记录到数据库时发生异常: {e}")
|
||
traceback.print_exc() # 打印详细的异常堆栈
|
||
else:
|
||
record["RecordedInDB"] = "Disabled"
|
||
print(f" - 信息: 数据库记录功能已禁用")
|
||
|
||
# 添加记录到CSV数据
|
||
csv_data.append([record[col] for col in csv_data[0]])
|
||
|
||
# 写入CSV清单
|
||
try:
|
||
print(f"\n正在写入清单CSV: {csv_path}")
|
||
with open(csv_path, 'w', newline='', encoding='utf-8-sig') as f_csv:
|
||
writer = csv.writer(f_csv)
|
||
writer.writerows(csv_data)
|
||
print(f"清单CSV生成成功")
|
||
except Exception as e:
|
||
print(f"写入CSV文件时出错: {e}")
|
||
traceback.print_exc()
|
||
|
||
print(f"\n处理完成. 共处理 {len(entries)} 个条目.")
|
||
print(f"结果保存在: {output_dir}")
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description="从TravelContentCreator结果目录提取内容并渲染到指定目录")
|
||
parser.add_argument("--source", type=str, help="源目录路径")
|
||
parser.add_argument("--output", type=str, help="输出目录路径")
|
||
parser.add_argument("--run-id", type=str, help="自定义运行ID")
|
||
parser.add_argument("--prefer-original", action="store_true", help="优先使用原始内容,忽略审核结果")
|
||
parser.add_argument("--db-path", type=str, help="数据库路径,若不提供则使用默认路径")
|
||
parser.add_argument("--disable-db", action="store_true", help="禁用数据库记录功能")
|
||
|
||
args = parser.parse_args()
|
||
|
||
# 默认值设置
|
||
source = args.source if args.source else "/root/autodl-tmp/TravelContentCreator/result/2025-05-21_16-18-24"
|
||
output = args.output if args.output else "/root/autodl-tmp/TravelContentCreator/output/2025-05-21_16-18-24"
|
||
run_id = args.run_id if args.run_id else os.path.basename(source)
|
||
prefer_original = args.prefer_original
|
||
db_path = args.db_path if args.db_path else '/root/autodl-tmp/TravelContentCreator/distribution.db'
|
||
|
||
print("-" * 60)
|
||
print(f"开始提取和渲染流程")
|
||
print(f"源目录: {source}")
|
||
print(f"输出目录: {output}")
|
||
print(f"运行ID: {run_id}")
|
||
if prefer_original:
|
||
print("内容模式: 优先使用原始内容")
|
||
else:
|
||
print("内容模式: 根据审核结果选择内容")
|
||
|
||
if args.disable_db:
|
||
print("数据库记录: 已禁用")
|
||
else:
|
||
print(f"数据库记录: 已启用 (路径: {db_path})")
|
||
print("-" * 60)
|
||
|
||
process_result_directory(source, output, run_id, prefer_original, db_path)
|
||
|
||
print("\n脚本执行完毕.")
|
||
|
||
if __name__ == "__main__":
|
||
main() |