video_template_gen/code/copy_video.py

494 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from openai import OpenAI
import os
import base64
import time
from datetime import datetime
from save_usage_info import save_usage_info_to_txt, save_simple_usage_info
# Base64 编码格式
def encode_video(video_path):
with open(video_path, "rb") as video_file:
return base64.b64encode(video_file.read()).decode("utf-8")
def encode_audio(audio_path):
with open(audio_path, "rb") as audio_file:
return base64.b64encode(audio_file.read()).decode("utf-8")
def read_txt_file(txt_path):
"""读取txt文件内容"""
try:
with open(txt_path, 'r', encoding='utf-8') as file:
content = file.read()
print(f"成功读取txt文件: {txt_path}")
print(f"文件内容长度: {len(content)} 字符")
return content
except FileNotFoundError:
print(f"错误: 找不到文件 {txt_path}")
return ""
except Exception as e:
print(f"读取文件时出错: {e}")
return ""
def read_json_file(json_path):
"""读取JSON文件内容"""
try:
import json
with open(json_path, 'r', encoding='utf-8') as file:
data = json.load(file)
print(f"成功读取JSON文件: {json_path}")
return data
except FileNotFoundError:
print(f"错误: 找不到文件 {json_path}")
return None
except json.JSONDecodeError as e:
print(f"JSON解析错误: {e}")
return None
except Exception as e:
print(f"读取JSON文件时出错: {e}")
return None
def format_speech_json(speech_data):
"""格式化口播转文字JSON数据支持SenseVoice格式"""
if not speech_data:
return ""
formatted_text = "【口播转文字内容】\n"
if isinstance(speech_data, dict):
# 新SenseVoice格式 - 处理raw_result
if 'raw_result' in speech_data:
raw_result = speech_data['raw_result']
if isinstance(raw_result, list) and len(raw_result) > 0:
# 提取所有文本内容
all_texts = []
for item in raw_result:
if isinstance(item, dict) and 'text' in item:
text = item['text']
# 清理SenseVoice的特殊标签
import re
clean_text = re.sub(r'<\|[^|]+\|>', '', text)
clean_text = ' '.join(clean_text.split())
if clean_text.strip():
all_texts.append(clean_text.strip())
if all_texts:
formatted_text += f"完整转录文本: {' '.join(all_texts)}\n"
# 基本信息
if 'model' in speech_data:
formatted_text += f"转录模型: {speech_data['model']}\n"
if 'transcribe_time' in speech_data:
formatted_text += f"转录耗时: {speech_data['transcribe_time']:.3f}\n"
if 'file_path' in speech_data:
formatted_text += f"音频文件: {speech_data['file_path']}\n"
# 旧SenseVoice格式兼容
elif 'clean_text' in speech_data:
formatted_text += f"完整转录文本: {speech_data['clean_text']}\n"
if 'model' in speech_data:
formatted_text += f"转录模型: {speech_data['model']}\n"
if 'transcribe_time' in speech_data:
formatted_text += f"转录耗时: {speech_data['transcribe_time']:.3f}\n"
# 情绪分析
if 'emotions' in speech_data and speech_data['emotions']:
emotions = [emotion.get('emotion', '') for emotion in speech_data['emotions']]
formatted_text += f"情绪分析: {', '.join(emotions)}\n"
# 背景事件
if 'events' in speech_data and speech_data['events']:
events = [event.get('event', '') for event in speech_data['events']]
formatted_text += f"音频事件: {', '.join(events)}\n"
# 如果是字幕提取器的格式(备用)
elif 'continuous_text' in speech_data:
formatted_text += f"完整文本: {speech_data['continuous_text']}\n"
if 'stats' in speech_data:
stats = speech_data['stats']
formatted_text += f"统计信息: 检测数量{stats.get('filtered_detections', 0)}个,"
formatted_text += f"平均置信度{stats.get('average_confidence', 0):.3f}\n"
return formatted_text
def format_whisper_json(whisper_data):
"""格式化Whisper口播转文字JSON数据"""
if not whisper_data:
return ""
formatted_text = "【Whisper口播转文字内容】\n"
if isinstance(whisper_data, dict):
# 基本信息
# 详细时间轴 - 显示所有片段
if 'segments' in whisper_data and len(whisper_data['segments']) > 0:
formatted_text += "\n详细时间轴:\n"
for segment in whisper_data['segments']:
segment_id = segment.get('id', 0)
start_time = segment.get('start', 0)
end_time = segment.get('end', 0)
text = segment.get('text', '')
formatted_text += f" id:{segment_id}, start:{start_time:.2f}, end:{end_time:.2f}, text:{text}\n"
return formatted_text
def format_ocr_json(ocr_data):
"""格式化OCR字幕转文字JSON数据"""
if not ocr_data:
return ""
formatted_text = "【OCR字幕识别内容】\n"
# 如果是字幕提取器的格式
if isinstance(ocr_data, dict):
# 显示使用的OCR引擎
# if 'ocr_engine' in ocr_data:
# formatted_text += f"OCR引擎: {ocr_data['ocr_engine']}\n"
if 'continuous_text' in ocr_data:
formatted_text += f"完整字幕文本: {ocr_data['continuous_text']}\n"
# if 'subtitles' in ocr_data and len(ocr_data['subtitles']) > 0:
# formatted_text += "详细字幕时间轴:\n"
# for subtitle in ocr_data['subtitles'][:10]: # 只显示前10个避免过长
# timestamp = subtitle.get('timestamp', 0)
# text = subtitle.get('text', '')
# engine = subtitle.get('engine', '')
# confidence = subtitle.get('confidence', 0)
# formatted_text += f" {timestamp:.2f}s [{engine}|{confidence:.3f}]: {text}\n"
# if len(ocr_data['subtitles']) > 10:
# formatted_text += f" ... (还有{len(ocr_data['subtitles']) - 10}个字幕片段)\n"
return formatted_text
def format_clip_json(clip_data):
"""格式化视频转场分析JSON数据"""
if not clip_data:
return ""
formatted_text = "【视频转场分析内容】\n"
if isinstance(clip_data, dict):
# 显示视频基本信息
if 'video_name' in clip_data:
formatted_text += f"视频名称: {clip_data['video_name']}\n"
if 'analysis_time' in clip_data:
formatted_text += f"分析时间: {clip_data['analysis_time']}\n"
if 'total_scenes' in clip_data:
formatted_text += f"检测到场景数: {clip_data['total_scenes']}\n"
# 详细场景信息
if 'scenes' in clip_data and len(clip_data['scenes']) > 0:
formatted_text += "\n详细场景信息:\n"
for i, scene in enumerate(clip_data['scenes'], 1):
formatted_text += f"scenes {i}:\n"
formatted_text += f" start_time: {scene.get('start_time', 0):.2f}\n"
formatted_text += f" end_time: {scene.get('end_time', 0):.2f}\n"
formatted_text += f" duration: {scene.get('duration', 0):.2f}\n"
formatted_text += f" type: {scene.get('type')}\n"
formatted_text += "\n"
return formatted_text
def save_result_to_txt(response_text, video_path, save_dir="/root/autodl-tmp/video_llm"):
"""将分析结果保存为TXT文件"""
# 创建保存目录
os.makedirs(save_dir, exist_ok=True)
# 生成文件名(基于视频文件名和时间戳)
video_name = os.path.splitext(os.path.basename(video_path))[0]
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
txt_filename = f"{video_name}_analysis_{timestamp}.txt"
txt_path = os.path.join(save_dir, txt_filename)
# 准备保存内容(添加头部信息)
content = f"""视频分析结果
=====================================
视频文件: {video_path}
分析时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
=====================================
{response_text}
"""
# 保存到文件
try:
with open(txt_path, 'w', encoding='utf-8') as f:
f.write(content)
print(f"\n✅ 分析结果已保存到: {txt_path}")
return txt_path
except Exception as e:
print(f"\n❌ 保存TXT文件失败: {e}")
return None
STREAM_MODE = True
# 文件路径配置
video_path = "/root/autodl-tmp/video/深圳青旅国际旅行社_compressed.mp4"
#audio_path = "/root/autodl-tmp/video2audio/sample_demo_6.wav"
#txt_path = "/root/autodl-tmp/hot_video_analyse/source/example_reference.txt" # 使用示例参考文档
# JSON文件路径配置
speech_json_path = "/root/autodl-tmp/video_sence/深圳青旅国际旅行社_sensevoice.json" # 口播转文字JSON文件
ocr_json_path = "/root/autodl-tmp/video_cnocr/深圳青旅国际旅行社_subtitles.json" # OCR字幕转文字JSON文件
#clip_json_path = "/root/autodl-tmp/02_VideoSplitter/VideoSplitter_output/shou_gonglve_3_scenes.json"
whisper_json_path = "/root/autodl-tmp/video_whisper/深圳青旅国际旅行社_transcript.json" # Whisper转文字JSON文件
ocr_txt_path = "/root/autodl-tmp/video_cnocr/深圳青旅国际旅行社_subtitles_processed.txt"
# 编码文件
print("开始编码文件...")
encode_start_time = time.time()
base64_video = encode_video(video_path)
#base64_audio = encode_audio(audio_path)
#txt_content = read_txt_file(txt_path)
#读取JSON文件内容
print("读取JSON文件...")
speech_data = read_json_file(speech_json_path)
ocr_data = read_json_file(ocr_json_path)
#clip_data = read_json_file(clip_json_path)
whisper_data = read_json_file(whisper_json_path)
# 格式化JSON内容
speech_content = format_speech_json(speech_data)
#ocr_content = format_ocr_json(ocr_data)
#clip_content = format_clip_json(clip_data)
whisper_content = format_whisper_json(whisper_data)
with open(ocr_txt_path, 'r') as file:
ocr_content = file.read()
# # 合并内容
txt_content = ""
# if speech_content:
# txt_content += speech_content + "\n\n"
if ocr_content:
txt_content += ocr_content + "\n\n"
# if clip_content:
# txt_content += clip_content + "\n\n"
if whisper_content:
txt_content += whisper_content + "\n\n"
print(f"合并后的参考内容长度: {len(txt_content)} 字符")
print(txt_content)
encode_end_time = time.time()
encode_duration = encode_end_time - encode_start_time
print(f"文件编码完成,耗时: {encode_duration:.2f}")
# 统计提示词token
prompt_text = """🎥 **抖音短视频内容分析专家**
## 任务背景
您是一位经验丰富的视频导演和编辑需要基于以上OCR和Whisper的两个时间轴数据和视频内容。为视频写一个完整、流畅的脚本。
请对这个抖音短视频进行详细的内容分析,重点关注以下两个方面:
## 🎤 一、口播内容提取
请仔细听取视频中的语音内容,完整转录:
- **完整口播转录**:逐字逐句转录所有口语表达
- **语音时长**:估算总的讲话时长
## 📝 二、字幕文字识别
请识别视频画面中出现的所有文字内容:
- **屏幕字幕**:视频中显示的字幕文字(包括自动字幕和手动添加的字幕)
- **标题文字**:视频开头、中间、结尾出现的大标题
## 📊 输出格式要求
## 视频内容分析
请按照以下JSON格式输出视频描述
{
"total_Oral broadcasting":"请你生成一个完整的口播内容。",
"summary": "请用一句话总结视频的核心内容,突出视频的主要卖点和价值主张",
"content": [
{
"id": 跟随Whisper口播转文字内容中的id,
"start": 跟随Whisper口播转文字内容中的start,
"end": 跟随Whisper口播转文字内容中的end,
"talk": "请将对应时间的口播或字幕信息,填入此",
"subtitles": "跟随OCR字幕识别内容的文本",
"description": "跳转到视频对应时间,将视频对应时间的图片,描述这个镜头的画面内容、人物动作、场景特点等。不要重复描述。"
},
]
}
## 输出要求
1. summary用一句话概括视频核心内容突出主要卖点
2. content的时间轴要与whisper的保持一致
2. content按时间顺序交替描述镜头和转场
描述:
* id镜头序号从1开始递增
* start开始时间精确到小数点后一位
* end结束时间精确到小数点后一位
* talk该镜头中的对话或文字内容
* subtitles该镜头中的字幕内容
* description详细描述镜头内容包括
- 画面构图和场景
- 人物动作和表情
- 重要道具和元素
- 特殊效果和转场
## 注意事项
1. 保持描述简洁明了,但要有足够的细节
2. 突出视频的亮点和特色
3. 确保时间戳的准确性
4. 对话内容要符合视频画面
5. 整体风格要统一连贯
6. 每个镜头的描述要包含关键信息
请根据以上要求分析视频并输出JSON格式的描述。
请开始详细分析这个抖音短视频:"""
client = OpenAI(
# 若没有配置环境变量请用百炼API Key将下行替换为api_key="sk-xxx"
api_key="sk-3a0e98d05fab49cebc1f1379ca92d85d",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
# 构建content列表
content_list = [
{
# 直接传入视频文件时请将type的值设置为video_url
"type": "video_url",
"video_url": {"url": f"data:video/mp4;base64,{base64_video}"},
}
# ,
# {
# "type": "audio_url",
# "audio_url": {"url": f"data:audio/wav;base64,{base64_audio}"},
# }
]
## 如果txt文件有内容添加到content中
# 添加主要提示文本(包含参考资料内容)
prompt_text_with_references = f"""🎥 **抖音短视频内容分析专家**
## 📋 参考资料内容
【OCR转文字内容】
{txt_content}+{prompt_text}"""
content_list.append({
"type": "text",
"text": prompt_text_with_references
})
print(f"\n开始请求API...")
print(f"请求时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Stream模式: {STREAM_MODE}")
print(f"Content项目数量: {len(content_list)}")
# 记录API请求开始时间
api_start_time = time.time()
completion = client.chat.completions.create(
model="qwen-omni-turbo",
#model="/root/autodl-tmp/llm/Qwen-omni",
messages=[
{
"role": "system",
"content": "You are a helpful assistant."
#"content": [{"type":"text","text": "You are a helpful assistant."}]
},
{
"role": "user",
"content": content_list
}
],
stream=STREAM_MODE,
stream_options={"include_usage": True} if STREAM_MODE else None,
temperature=0.5
)
if STREAM_MODE:
# 流式输出 - 拼接完整回复
full_response = ""
usage_info = None
money = {}
# 记录第一个token的时间
first_token_time = None
print("正在生成回复...")
for chunk in completion:
if chunk.choices:
delta = chunk.choices[0].delta
if delta.content:
# 记录第一个token的时间
if first_token_time is None:
first_token_time = time.time()
first_token_delay = first_token_time - api_start_time
print(f"首个token延迟: {first_token_delay:.2f}")
# 拼接内容
full_response += delta.content
else:
# 保存使用情况信息
usage_info = chunk.usage
money["output_momey"] = chunk.usage.completion_tokens * 0.0045 / 1000
money["prompt_momey"] = chunk.usage.prompt_tokens_details.text_tokens * 0.0004 / 1000
money["video_momey"] = chunk.usage.prompt_tokens_details.video_tokens * 0.0015 / 1000
money["audio_momey"] = chunk.usage.prompt_tokens_details.audio_tokens * 0.025 / 1000
money["sum_momey"]= money["output_momey"] + money["prompt_momey"] + money["video_momey"] + money["audio_momey"]
print(usage_info)
# 记录API请求结束时间
api_end_time = time.time()
total_duration = api_end_time - api_start_time
# 输出完整的响应
print("\n" + "="*50)
print("完整回复:")
print("="*50)
print(full_response)
# 保存结果为TXT文件
txt_file_path = save_result_to_txt(full_response, video_path)
# 保存使用情况信息
usage_info_txt = save_usage_info_to_txt(usage_info, total_duration, money, video_path)
# 输出使用情况信息
if usage_info:
print("\n" + "="*50)
print("📈 使用情况:")
print("="*50)
print(usage_info)
# else:
# # 非流式输出 - 直接输出完整响应
# api_end_time = time.time()
# total_duration = api_end_time - api_start_time
# print("非流式输出模式:")
# print("完整回复:")
# print("="*50)
# print(completion.choices[0].message.content)
# # 保存结果为TXT文件
# txt_file_path = save_result_to_txt(completion.choices[0].message.content + "total_duration:" + str(total_duration), video_path)
# # 输出时间统计信息
# print("\n" + "="*50)
# print("⏱️ 时间统计:")
# print("="*50)
# print(f"📁 文件编码时间: {encode_duration:.2f} 秒")
# print(f"🕐 API总响应时间: {total_duration:.2f} 秒")
# print(f"⏰ 完成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# # 输出使用情况信息
# if hasattr(completion, 'usage') and completion.usage:
# print("\n" + "="*50)
# print("📈 使用情况:")
# print("="*50)
# print(completion.usage)