将字幕提取时间轴逻辑改为正常;新增合并视频片段模版输出为json文件,id和时间轴对齐。尚未完成:如果llm输出的json格式错误,则合并失败

This commit is contained in:
root 2025-07-10 17:16:21 +08:00
parent 2079383951
commit 7636eca330
4 changed files with 310 additions and 41 deletions

View File

@ -229,7 +229,93 @@ def save_result_to_txt(response_text, video_path, video_dir, save_dir="/root/aut
print(f"\n❌ 保存TXT文件失败: {e}")
return None
def save_result_to_txt(response_text, video_path, video_dir, save_dir="/root/autodl-tmp/video_llm"):
"""将分析结果保存为TXT文件"""
# 创建保存目录
os.makedirs(save_dir, exist_ok=True)
# 生成文件名(基于视频文件名和时间戳)
video_name = os.path.splitext(os.path.basename(video_path))[0]
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
txt_filename = f"{video_dir}_{timestamp}.txt"
txt_dir = os.path.join(save_dir, "Template", video_name)
os.makedirs(txt_dir, exist_ok=True)
txt_path = os.path.join(txt_dir, txt_filename)
# 准备保存内容(添加头部信息)
content = f"""视频分析结果
=====================================
视频文件: {video_path}
分析时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
=====================================
{response_text}
"""
# 保存到文件
try:
with open(txt_path, 'w', encoding='utf-8') as f:
f.write(content)
print(f"\n✅ 分析结果已保存到: {txt_path}")
return txt_path
except Exception as e:
print(f"\n❌ 保存TXT文件失败: {e}")
return None
def save_result_to_json(response_text, base_dir, video_dir):
"""将分析结果保存为JSON文件"""
import json
import re
# 创建保存目录
json_dir = os.path.join(base_dir, "llm")
os.makedirs(json_dir, exist_ok=True)
# 生成文件名
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
json_filename = f"{video_dir}_{timestamp}.json"
json_path = os.path.join(json_dir, json_filename)
try:
# 尝试从响应文本中提取JSON内容
# 查找JSON格式的内容通常在```json和```之间或者直接是JSON格式
json_match = re.search(r'```json\s*(.*?)\s*```', response_text, re.DOTALL)
if json_match:
json_content = json_match.group(1)
else:
# 如果没有找到```json标记尝试直接解析整个响应
json_content = response_text
# 尝试解析JSON
try:
parsed_json = json.loads(json_content)
except json.JSONDecodeError:
# 如果解析失败,将整个响应作为文本保存
parsed_json = {
"raw_response": response_text,
"analysis_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
"video_dir": video_dir,
"note": "JSON解析失败保存原始响应"
}
# 保存到文件
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(parsed_json, f, ensure_ascii=False, indent=2)
print(f"\n✅ JSON结果已保存到: {json_path}")
return json_path
except Exception as e:
print(f"\n❌ 保存JSON文件失败: {e}")
# 如果保存失败,尝试保存原始文本
try:
fallback_path = json_path.replace('.json', '_raw.txt')
with open(fallback_path, 'w', encoding='utf-8') as f:
f.write(response_text)
print(f"\n⚠️ 原始响应已保存到: {fallback_path}")
return fallback_path
except Exception as e2:
print(f"\n❌ 保存原始响应也失败: {e2}")
return None
# 只有在直接运行此文件时才执行以下代码
if __name__ == "__main__":

View File

@ -223,9 +223,11 @@ for i ,video_dir in enumerate(video_dirs):
print(full_response)
# 保存结果为TXT文件
txt_file_path = save_result_to_txt(full_response, base_dir , video_dir)
txt_file_path = save_result_to_txt(full_response, base_dir, video_dir)
# 保存结果为JSON文件
json_file_path = save_result_to_json(full_response, base_dir, video_dir)
# 保存使用情况信息
usage_info_txt = save_usage_info_to_txt(usage_info, total_duration, money, base_dir , video_dir)
usage_info_txt = save_usage_info_to_txt(usage_info, total_duration, money, base_dir, video_dir)
# 输出使用情况信息
if usage_info:

View File

@ -15,6 +15,7 @@ import re
from pathlib import Path
from datetime import datetime
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
# 添加当前目录到路径以导入OCR模块
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
@ -60,32 +61,7 @@ class BatchSubtitleExtractor:
return int(match.group(1))
return 1
def _adjust_timestamps(self, results, segment_number):
"""
根据片段号调整时间戳
Args:
results: OCR结果字典
segment_number: 片段号
Returns:
dict: 调整后的结果
"""
if segment_number == 1:
# 片段号为1不调整时间
return results
# 计算时间偏移量
time_offset = (segment_number - 1) * 30
logger.info(f"片段号: {segment_number}, 时间偏移: +{time_offset}")
# 调整所有字幕的时间戳
for subtitle in results['subtitles']:
if 'timestamp' in subtitle:
subtitle['timestamp'] += time_offset
return results
def find_video_files(self, input_dir):
"""查找目录中的所有视频文件"""
@ -188,8 +164,7 @@ class BatchSubtitleExtractor:
subtitle_position=kwargs.get('position', 'bottom')
)
# 根据片段号调整时间戳
results = self._adjust_timestamps(results, segment_number)
# 创建OCR目录
video_ocr_dir.mkdir(parents=True, exist_ok=True)
@ -239,19 +214,20 @@ class BatchSubtitleExtractor:
'segment_number': segment_number
}
def extract_batch(self, input_dir, output_dir, **kwargs):
def extract_batch(self, input_dir, output_dir, max_workers=4, **kwargs):
"""
批量提取字幕行处理
批量提取字幕行处理
Args:
input_dir: 输入目录或文件
output_dir: 输出目录
max_workers: 最大并行工作线程数
**kwargs: 其他参数
Returns:
dict: 批量处理结果
"""
logger.info(f"开始批量字幕提取(串行处理")
logger.info(f"开始批量字幕提取(并行处理,最大线程数: {max_workers}")
logger.info(f"输入: {input_dir}")
logger.info(f"输出目录: {output_dir}")
logger.info(f"OCR引擎: {self.ocr_engine}")
@ -274,16 +250,39 @@ class BatchSubtitleExtractor:
logger.info(f"找到 {len(video_files)} 个视频文件")
results = []
completed_count = 0
# 串行处理所有视频文件
for i, video_file in enumerate(video_files, 1):
logger.info(f"处理第 {i}/{len(video_files)} 个视频")
result = self.extract_single_video(video_file, output_dir, **kwargs)
results.append(result)
# 并行处理所有视频文件
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交所有任务
future_to_video = {
executor.submit(self.extract_single_video, video_file, output_dir, **kwargs): video_file
for video_file in video_files
}
# 显示进度
progress = i / len(video_files) * 100
logger.info(f"批量处理进度: {progress:.1f}% ({i}/{len(video_files)})")
# 处理完成的任务
for future in as_completed(future_to_video):
video_file = future_to_video[future]
completed_count += 1
try:
result = future.result()
results.append(result)
# 显示进度
progress = completed_count / len(video_files) * 100
logger.info(f"批量处理进度: {progress:.1f}% ({completed_count}/{len(video_files)}) - {video_file.name}")
except Exception as e:
error_msg = f"处理视频 {video_file} 时出错: {str(e)}"
logger.error(error_msg)
results.append({
'video_path': str(video_file),
'success': False,
'error': error_msg,
'process_time': 0,
'segment_number': self._extract_segment_number(video_file.stem)
})
total_time = time.time() - start_time
@ -357,6 +356,8 @@ def main():
parser.add_argument("--position", default="full",
choices=["full", "center", "bottom"],
help="字幕区域位置 (full=全屏, center=居中0.5-0.8, bottom=居下0.7-1.0)")
parser.add_argument("-w", "--workers", type=int, default=4,
help="并行处理线程数 (默认: 4)")
args = parser.parse_args()
@ -371,6 +372,7 @@ def main():
result = batch_extractor.extract_batch(
input_dir=args.input,
output_dir=args.output,
max_workers=args.workers,
interval=args.interval,
confidence=args.confidence,
formats=args.formats,

View File

@ -0,0 +1,179 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
视频片段整合脚本
用于整合多个视频分析片段文件到一个完整的分析结果
"""
import json
import os
import glob
from datetime import datetime
def extract_json_from_file(file_path):
"""从文件中提取JSON内容"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 查找JSON内容的开始和结束位置
start_marker = "```json"
end_marker = "```"
start_pos = content.find(start_marker)
if start_pos == -1:
print(f"警告: 在文件 {file_path} 中未找到JSON标记")
return None
# 找到JSON内容的开始位置
json_start = content.find('{', start_pos)
if json_start == -1:
print(f"警告: 在文件 {file_path} 中未找到JSON对象")
return None
# 找到JSON内容的结束位置
json_end = content.rfind(end_marker)
if json_end == 0:
print(f"警告: 在文件 {file_path} 中未找到JSON对象结束")
return None
# 提取JSON字符串
json_str = content[json_start:json_end]
print(f"json_str:{json_str}")
# 解析JSON
try:
json_data = json.loads(json_str)
return json_data
except json.JSONDecodeError as e:
print(f"错误: 解析文件 {file_path} 中的JSON失败: {e}")
print(f"尝试手动修复JSON...")
except Exception as e:
print(f"错误: 读取文件 {file_path} 失败: {e}")
return None
def merge_segments(segment_files):
"""合并多个片段文件"""
merged_data = {
"total_Oral broadcasting": "",
"summary": "",
"content": []
}
# 按文件名排序,确保按正确顺序合并
segment_files.sort()
for i, file_path in enumerate(segment_files):
print(f"正在处理片段 {i+1}: {os.path.basename(file_path)}")
json_data = extract_json_from_file(file_path)
if json_data is None:
continue
# 合并口播内容
if "total_Oral broadcasting" in json_data:
if merged_data["total_Oral broadcasting"]:
merged_data["total_Oral broadcasting"] += " " + json_data["total_Oral broadcasting"]
else:
merged_data["total_Oral broadcasting"] = json_data["total_Oral broadcasting"]
# 合并摘要
if "summary" in json_data:
if merged_data["summary"]:
merged_data["summary"] += " " + json_data["summary"]
else:
merged_data["summary"] = json_data["summary"]
# 合并内容
if "content" in json_data:
if i >= 1:
for item in json_data["content"]:
item["start"] = item["start"] + i*30
item["end"] = item["end"] + i*30
# 为每个片段的content重新编号
for item in json_data["content"]:
item["id"] = len(merged_data["content"]) + 1
merged_data["content"].append(item)
return merged_data
def save_merged_result(merged_data, output_file):
"""保存合并结果到JSON文件"""
try:
# # 添加元数据到JSON中
# json_data = {
# "metadata": {
# "video_file": "/root/autodl-tmp/video_processed/广州广之旅国际旅行社股份有限公司",
# "analysis_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
# "merge_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# },
# "data": merged_data
# }
# 保存为纯JSON文件
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(merged_data, f, ensure_ascii=False, indent=4)
print(f"合并结果已保存到: {output_file}")
return True
except Exception as e:
print(f"错误: 保存合并结果失败: {e}")
return False
def main():
"""主函数"""
# 获取当前目录
current_dir = "/root/autodl-tmp/video_llm/Template/牛管家"
print(current_dir)
# 查找所有片段文件
pattern = os.path.join(current_dir, "*.txt")
segment_files = glob.glob(pattern)
# if not segment_files:
# print("错误: 未找到任何片段文件")
# return
# print(f"找到 {len(segment_files)} 个片段文件:")
# #frame = extract_json_from_file(segment_files[1])
# #print(frame["content"][0]["id"])
# # print(frame)
# # for file_path in segment_files:
# # print(f" - {os.path.basename(file_path)}")
# for file_path in segment_files:
# frame = extract_json_from_file(file_path)
# print(frame)
# if __name__ == "__main__":
# main()
# 合并片段
print("\n开始合并片段...")
merged_data = merge_segments(segment_files)
if not merged_data["content"]:
print("错误: 合并失败,没有有效内容")
return
# 生成输出文件名
base_name = "牛管家"
output_file = os.path.join(current_dir, f"{base_name}_merged_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json")
# 保存合并结果
if save_merged_result(merged_data, output_file):
print(f"\n合并完成!")
print(f"总片段数: {len(segment_files)}")
print(f"总内容条目: {len(merged_data['content'])}")
print(f"输出文件: {os.path.basename(output_file)}")
# 显示统计信息
print(f"\n统计信息:")
print(f"- 口播内容长度: {len(merged_data['total_Oral broadcasting'])} 字符")
print(f"- 摘要长度: {len(merged_data['summary'])} 字符")
print(f"- 内容条目数: {len(merged_data['content'])}")
if __name__ == "__main__":
main()