from openai import OpenAI import os import base64 import time from datetime import datetime from token_counter import * # Base64 编码格式 def encode_video(video_path): with open(video_path, "rb") as video_file: return base64.b64encode(video_file.read()).decode("utf-8") def encode_audio(audio_path): with open(audio_path, "rb") as audio_file: return base64.b64encode(audio_file.read()).decode("utf-8") def read_txt_file(txt_path): """读取txt文件内容""" try: with open(txt_path, 'r', encoding='utf-8') as file: content = file.read() print(f"成功读取txt文件: {txt_path}") print(f"文件内容长度: {len(content)} 字符") return content except FileNotFoundError: print(f"错误: 找不到文件 {txt_path}") return "" except Exception as e: print(f"读取文件时出错: {e}") return "" def read_json_file(json_path): """读取JSON文件内容""" try: import json with open(json_path, 'r', encoding='utf-8') as file: data = json.load(file) print(f"成功读取JSON文件: {json_path}") return data except FileNotFoundError: print(f"错误: 找不到文件 {json_path}") return None except json.JSONDecodeError as e: print(f"JSON解析错误: {e}") return None except Exception as e: print(f"读取JSON文件时出错: {e}") return None def format_speech_json(speech_data): """格式化口播转文字JSON数据(支持SenseVoice格式)""" if not speech_data: return "" formatted_text = "【口播转文字内容】\n" if isinstance(speech_data, dict): # 新SenseVoice格式 - 处理raw_result if 'raw_result' in speech_data: raw_result = speech_data['raw_result'] if isinstance(raw_result, list) and len(raw_result) > 0: # 提取所有文本内容 all_texts = [] for item in raw_result: if isinstance(item, dict) and 'text' in item: text = item['text'] # 清理SenseVoice的特殊标签 import re clean_text = re.sub(r'<\|[^|]+\|>', '', text) clean_text = ' '.join(clean_text.split()) if clean_text.strip(): all_texts.append(clean_text.strip()) if all_texts: formatted_text += f"完整转录文本: {' '.join(all_texts)}\n" # 基本信息 if 'model' in speech_data: formatted_text += f"转录模型: {speech_data['model']}\n" if 'transcribe_time' in speech_data: formatted_text += f"转录耗时: {speech_data['transcribe_time']:.3f}秒\n" if 'file_path' in speech_data: formatted_text += f"音频文件: {speech_data['file_path']}\n" # 旧SenseVoice格式(兼容) elif 'clean_text' in speech_data: formatted_text += f"完整转录文本: {speech_data['clean_text']}\n" if 'model' in speech_data: formatted_text += f"转录模型: {speech_data['model']}\n" if 'transcribe_time' in speech_data: formatted_text += f"转录耗时: {speech_data['transcribe_time']:.3f}秒\n" # 情绪分析 if 'emotions' in speech_data and speech_data['emotions']: emotions = [emotion.get('emotion', '') for emotion in speech_data['emotions']] formatted_text += f"情绪分析: {', '.join(emotions)}\n" # 背景事件 if 'events' in speech_data and speech_data['events']: events = [event.get('event', '') for event in speech_data['events']] formatted_text += f"音频事件: {', '.join(events)}\n" # 如果是字幕提取器的格式(备用) elif 'continuous_text' in speech_data: formatted_text += f"完整文本: {speech_data['continuous_text']}\n" if 'stats' in speech_data: stats = speech_data['stats'] formatted_text += f"统计信息: 检测数量{stats.get('filtered_detections', 0)}个," formatted_text += f"平均置信度{stats.get('average_confidence', 0):.3f}\n" return formatted_text def format_whisper_json(whisper_data): """格式化Whisper口播转文字JSON数据""" if not whisper_data: return "" formatted_text = "【Whisper口播转文字内容】\n" if isinstance(whisper_data, dict): # 基本信息 # 详细时间轴 - 显示所有片段 if 'segments' in whisper_data and len(whisper_data['segments']) > 0: formatted_text += "\n详细时间轴:\n" for segment in whisper_data['segments']: segment_id = segment.get('id', 0) start_time = segment.get('start', 0) end_time = segment.get('end', 0) text = segment.get('text', '') formatted_text += f" id:{segment_id}, start:{start_time:.2f}, end:{end_time:.2f}, text:{text}\n" return formatted_text def format_ocr_json(ocr_data): """格式化OCR字幕转文字JSON数据""" if not ocr_data: return "" formatted_text = "【OCR字幕识别内容】\n" # 如果是字幕提取器的格式 if isinstance(ocr_data, dict): # 显示使用的OCR引擎 # if 'ocr_engine' in ocr_data: # formatted_text += f"OCR引擎: {ocr_data['ocr_engine']}\n" if 'continuous_text' in ocr_data: formatted_text += f"完整字幕文本: {ocr_data['continuous_text']}\n" # if 'subtitles' in ocr_data and len(ocr_data['subtitles']) > 0: # formatted_text += "详细字幕时间轴:\n" # for subtitle in ocr_data['subtitles'][:10]: # 只显示前10个,避免过长 # timestamp = subtitle.get('timestamp', 0) # text = subtitle.get('text', '') # engine = subtitle.get('engine', '') # confidence = subtitle.get('confidence', 0) # formatted_text += f" {timestamp:.2f}s [{engine}|{confidence:.3f}]: {text}\n" # if len(ocr_data['subtitles']) > 10: # formatted_text += f" ... (还有{len(ocr_data['subtitles']) - 10}个字幕片段)\n" return formatted_text def format_clip_json(clip_data): """格式化视频转场分析JSON数据""" if not clip_data: return "" formatted_text = "【视频转场分析内容】\n" if isinstance(clip_data, dict): # 显示视频基本信息 if 'video_name' in clip_data: formatted_text += f"视频名称: {clip_data['video_name']}\n" if 'analysis_time' in clip_data: formatted_text += f"分析时间: {clip_data['analysis_time']}\n" if 'total_scenes' in clip_data: formatted_text += f"检测到场景数: {clip_data['total_scenes']} 个\n" # 详细场景信息 if 'scenes' in clip_data and len(clip_data['scenes']) > 0: formatted_text += "\n详细场景信息:\n" for i, scene in enumerate(clip_data['scenes'], 1): formatted_text += f"scenes {i}:\n" formatted_text += f" start_time: {scene.get('start_time', 0):.2f}秒\n" formatted_text += f" end_time: {scene.get('end_time', 0):.2f}秒\n" formatted_text += f" duration: {scene.get('duration', 0):.2f}秒\n" formatted_text += f" type: {scene.get('type')}\n" formatted_text += "\n" return formatted_text def save_result_to_txt(response_text, video_path, save_dir="/root/autodl-tmp/final_output"): """将分析结果保存为TXT文件""" # 创建保存目录 os.makedirs(save_dir, exist_ok=True) # 生成文件名(基于视频文件名和时间戳) video_name = os.path.splitext(os.path.basename(video_path))[0] timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") txt_filename = f"{video_name}_analysis_{timestamp}.txt" txt_path = os.path.join(save_dir, txt_filename) # 准备保存内容(添加头部信息) content = f"""视频分析结果 ===================================== 视频文件: {video_path} 分析时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ===================================== {response_text} """ # 保存到文件 try: with open(txt_path, 'w', encoding='utf-8') as f: f.write(content) print(f"\n✅ 分析结果已保存到: {txt_path}") return txt_path except Exception as e: print(f"\n❌ 保存TXT文件失败: {e}") return None STREAM_MODE = True # 文件路径配置 video_path = "/root/autodl-tmp/new/哈尔滨.mp4" #audio_path = "/root/autodl-tmp/video2audio/sample_demo_6.wav" #txt_path = "/root/autodl-tmp/hot_video_analyse/source/example_reference.txt" # 使用示例参考文档 # JSON文件路径配置 speech_json_path = "/root/autodl-tmp/new_sensevoice/哈尔滨_sensevoice.json" # 口播转文字JSON文件 ocr_json_path = "/root/autodl-tmp/new_cnocr/哈尔滨_subtitles.json" # OCR字幕转文字JSON文件 #clip_json_path = "/root/autodl-tmp/02_VideoSplitter/VideoSplitter_output/shou_gonglve_3_scenes.json" whisper_json_path = "/root/autodl-tmp/new_whisper/哈尔滨_transcript.json" # Whisper转文字JSON文件 ocr_txt_path = "/root/autodl-tmp/new_cnocr/哈尔滨_subtitles_processed.txt" # 编码文件 print("开始编码文件...") encode_start_time = time.time() base64_video = encode_video(video_path) #base64_audio = encode_audio(audio_path) #txt_content = read_txt_file(txt_path) #读取JSON文件内容 print("读取JSON文件...") speech_data = read_json_file(speech_json_path) ocr_data = read_json_file(ocr_json_path) #clip_data = read_json_file(clip_json_path) whisper_data = read_json_file(whisper_json_path) # 格式化JSON内容 speech_content = format_speech_json(speech_data) #ocr_content = format_ocr_json(ocr_data) #clip_content = format_clip_json(clip_data) whisper_content = format_whisper_json(whisper_data) with open(ocr_txt_path, 'r') as file: ocr_content = file.read() # # 合并内容 txt_content = "" # if speech_content: # txt_content += speech_content + "\n\n" if ocr_content: txt_content += ocr_content + "\n\n" # if clip_content: # txt_content += clip_content + "\n\n" if whisper_content: txt_content += whisper_content + "\n\n" print(f"合并后的参考内容长度: {len(txt_content)} 字符") print(txt_content) encode_end_time = time.time() encode_duration = encode_end_time - encode_start_time print(f"文件编码完成,耗时: {encode_duration:.2f} 秒") # 统计提示词token prompt_text = """🎥 **抖音短视频内容分析专家** ## 任务背景 您是一位经验丰富的视频导演和编辑,需要基于以上两个时间轴数据,和视频内容。为视频写一个完整、流畅的脚本。 请对这个抖音短视频进行详细的内容分析,重点关注以下三个方面: ## 🎤 一、口播内容提取 请仔细听取视频中的语音内容,完整转录: - **完整口播转录**:逐字逐句转录所有口语表达 - **语音时长**:估算总的讲话时长 ## 📝 二、字幕文字识别 请识别视频画面中出现的所有文字内容: - **屏幕字幕**:视频中显示的字幕文字(包括自动字幕和手动添加的字幕) - **标题文字**:视频开头、中间、结尾出现的大标题 ## 🎬 三、转场效果分析 请仔细观察视频中的转场效果,并且结合参考资料中的转场内容,请你整体分析一下视频。比如几个画面出现第一个转场等. 转场的time_start","time_end","textIdx"请严格按照参考资料中的口播内容的时间戳start,end,id填写,不要自己生成。 ## 📊 输出格式要求 ## 视频内容分析 请按照以下JSON格式输出视频描述: { "total_Oral broadcasting":"请你生成一个完整的口播内容。", "summary": "请用一句话总结视频的核心内容,突出视频的主要卖点和价值主张", "content": [ { "type": "cut", "scenes": 1, "time_start": 0.0, "time_end": 2.0, "talk": "请将对应时间的口播或字幕信息,填入此", "description": "跳转到视频对应时间,将视频对应时间的图片,描述这个镜头的画面内容、人物动作、场景特点等。不要重复描述。" }, { "type": "cut", "scenes": 2, "time_start": 2.0, "time_end": 4.5, "talk": "请将对应时间的口播或字幕信息,填入此", "description": "跳转到视频对应时间,将视频对应时间的图片,详细描述这个镜头的画面内容、人物动作、场景特点等。不要描述视频内容,只描述这个镜头的画面内容,不要重复描述。" }, { "type": "cut", "scenes": 3, "time_start": 4.5, "time_end": 6.0, "talk": "请将对应时间的口播或字幕信息,填入此", "description": "跳转到视频对应时间,将视频对应时间的图片,详细描述这个镜头的画面内容、人物动作、场景特点等。不要描述视频内容,只描述这个镜头的画面内容,不要重复描述。" } ] } ## 输出要求 1. summary:用一句话概括视频核心内容,突出主要卖点 2. content:按时间顺序交替描述镜头和转场 - 镜头(lens)描述: * textIdx:镜头序号,从1开始递增 * time_start:开始时间(秒),精确到小数点后一位 * time_end:结束时间(秒),精确到小数点后一位 * talk:该镜头中的对话或文字内容 * description:详细描述镜头内容,包括: - 画面构图和场景 - 人物动作和表情 - 重要道具和元素 - 特殊效果和转场 ## 注意事项 1. 保持描述简洁明了,但要有足够的细节 2. 突出视频的亮点和特色 3. 确保时间戳的准确性 4. 对话内容要符合视频画面 5. 整体风格要统一连贯 6. 每个镜头的描述要包含关键信息 ## 示例内容描述 1. 镜头1: - 开场特写镜头,展示产品外观 - 画面从模糊到清晰,突出产品细节 - 背景音乐渐入,营造氛围 - 文字提示:"全新升级,品质保证" 2. 转场1-2: - 类型:平滑滑动 - 目的:自然过渡到使用场景 - 效果:画面从产品特写平滑滑向使用场景 3. 镜头2: - 中景展示使用场景 - 人物自然流畅的动作展示 - 光线明亮,突出产品效果 - 文字说明:"简单操作,轻松上手" 4. 转场2-3: - 类型:快速缩放 - 目的:突出产品核心功能 - 效果:画面快速聚焦到产品关键部位 5. 镜头3: - 特写展示产品核心功能 - 慢动作展示关键细节 - 画面色彩鲜明,对比强烈 - 文字强调:"专业性能,值得信赖" 请根据以上要求,分析视频并输出JSON格式的描述。 请开始详细分析这个抖音短视频:""" # 调用token统计功能 token_stats = analyze_input_tokens(video_path, txt_content, prompt_text) client = OpenAI( # 若没有配置环境变量,请用百炼API Key将下行替换为:api_key="sk-xxx" api_key="EMPTY", base_url="http://localhost:8000/v1", ) # 构建content列表 content_list = [ { # 直接传入视频文件时,请将type的值设置为video_url "type": "video_url", "video_url": {"url": f"data:video/mp4;base64,{base64_video}"}, } # , # { # "type": "audio_url", # "audio_url": {"url": f"data:audio/wav;base64,{base64_audio}"}, # } ] # 如果txt文件有内容,添加到content中 if txt_content.strip(): content_list.append({ "type": "text", "text": f"参考资料内容:\n{txt_content}\n\n", "need": "第一部分是视频内容,第二部分是视频的字幕时间轴内容,第三部分是口播的字幕时间轴内容" }) # 添加主要提示文本 content_list.append({ "type": "text", "text": """🎥 **抖音短视频内容分析专家** ## 任务背景 您是一位经验丰富的视频导演和编辑,需要基于以上两个时间轴数据,和视频内容。为视频写一个完整、流畅的脚本。 请对这个抖音短视频进行详细的内容分析,重点关注以下三个方面: ## 🎤 一、口播内容提取 请仔细听取视频中的语音内容,完整转录: - **完整口播转录**:逐字逐句转录所有口语表达 - **语音时长**:估算总的讲话时长 ## 📝 二、字幕文字识别 请识别视频画面中出现的所有文字内容: - **屏幕字幕**:视频中显示的字幕文字(包括自动字幕和手动添加的字幕) - **标题文字**:视频开头、中间、结尾出现的大标题 ## 🎬 三、转场效果分析 请仔细观察视频中的转场效果,并且结合参考资料中的转场内容,请你整体分析一下视频。比如几个画面出现第一个转场等. 转场的time_start","time_end","textIdx"请严格按照参考资料中的口播内容的时间戳start,end,id填写,不要自己生成。 ## 📊 输出格式要求 ## 视频内容分析 请按照以下JSON格式输出视频描述: { "total_Oral broadcasting":"请你生成一个完整的口播内容。", "summary": "请用一句话总结视频的核心内容,突出视频的主要卖点和价值主张", "content": [ { "type": "cut", "scenes": 1, "time_start": 0.0, "time_end": 2.0, "talk": "请将对应时间的口播或字幕信息,填入此", "description": "跳转到视频对应时间,将视频对应时间的图片,描述这个镜头的画面内容、人物动作、场景特点等。不要重复描述。" }, { "type": "cut", "scenes": 2, "time_start": 2.0, "time_end": 4.5, "talk": "请将对应时间的口播或字幕信息,填入此", "description": "跳转到视频对应时间,将视频对应时间的图片,详细描述这个镜头的画面内容、人物动作、场景特点等。不要描述视频内容,只描述这个镜头的画面内容,不要重复描述。" }, { "type": "cut", "scenes": 3, "time_start": 4.5, "time_end": 6.0, "talk": "请将对应时间的口播或字幕信息,填入此", "description": "跳转到视频对应时间,将视频对应时间的图片,详细描述这个镜头的画面内容、人物动作、场景特点等。不要描述视频内容,只描述这个镜头的画面内容,不要重复描述。" } ] } ## 输出要求 1. summary:用一句话概括视频核心内容,突出主要卖点 2. content:按时间顺序交替描述镜头和转场 - 镜头(lens)描述: * textIdx:镜头序号,从1开始递增 * time_start:开始时间(秒),精确到小数点后一位 * time_end:结束时间(秒),精确到小数点后一位 * talk:该镜头中的对话或文字内容 * description:详细描述镜头内容,包括: - 画面构图和场景 - 人物动作和表情 - 重要道具和元素 - 特殊效果和转场 ## 注意事项 1. 保持描述简洁明了,但要有足够的细节 2. 突出视频的亮点和特色 3. 确保时间戳的准确性 4. 对话内容要符合视频画面 5. 整体风格要统一连贯 6. 每个镜头的描述要包含关键信息 ## 示例内容描述 1. 镜头1: - 开场特写镜头,展示产品外观 - 画面从模糊到清晰,突出产品细节 - 背景音乐渐入,营造氛围 - 文字提示:"全新升级,品质保证" 2. 转场1-2: - 类型:平滑滑动 - 目的:自然过渡到使用场景 - 效果:画面从产品特写平滑滑向使用场景 3. 镜头2: - 中景展示使用场景 - 人物自然流畅的动作展示 - 光线明亮,突出产品效果 - 文字说明:"简单操作,轻松上手" 4. 转场2-3: - 类型:快速缩放 - 目的:突出产品核心功能 - 效果:画面快速聚焦到产品关键部位 5. 镜头3: - 特写展示产品核心功能 - 慢动作展示关键细节 - 画面色彩鲜明,对比强烈 - 文字强调:"专业性能,值得信赖" 请根据以上要求,分析视频并输出JSON格式的描述。 请开始详细分析这个抖音短视频:""" }) print(f"\n开始请求API...") print(f"请求时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print(f"Stream模式: {STREAM_MODE}") print(f"Content项目数量: {len(content_list)}") # 记录API请求开始时间 api_start_time = time.time() completion = client.chat.completions.create( model="/root/autodl-tmp/llm/Qwen-omni", messages=[ { "role": "system", "content": [{"type":"text","text": "You are a helpful assistant."}] }, { "role": "user", "content": content_list } ], stream=STREAM_MODE, stream_options={"include_usage": True} if STREAM_MODE else None, temperature=0.5 ) if STREAM_MODE: # 流式输出 - 拼接完整回复 full_response = "" usage_info = None # 记录第一个token的时间 first_token_time = None token_count = 0 print("正在生成回复...") for chunk in completion: if chunk.choices: delta = chunk.choices[0].delta if delta.content: # 记录第一个token的时间 if first_token_time is None: first_token_time = time.time() first_token_delay = first_token_time - api_start_time print(f"首个token延迟: {first_token_delay:.2f} 秒") # 拼接内容 full_response += delta.content token_count += 1 # 实时显示(可选) #print(delta.content, end='', flush=True) else: # 保存使用情况信息 usage_info = chunk.usage # 记录API请求结束时间 api_end_time = time.time() total_duration = api_end_time - api_start_time # 输出完整的响应 print("\n" + "="*50) print("完整回复:") print("="*50) print(full_response) # 保存结果为TXT文件 txt_file_path = save_result_to_txt(full_response + "total_duration:" + str(total_duration), video_path) # 输出时间统计信息 print("\n" + "="*50) print("⏱️ 时间统计:") print("="*50) print(f"📁 文件编码时间: {encode_duration:.2f} 秒") if first_token_time: print(f"🚀 首个token延迟: {first_token_delay:.2f} 秒") generation_time = api_end_time - first_token_time print(f"⚡ 内容生成时间: {generation_time:.2f} 秒") print(f"🕐 API总响应时间: {total_duration:.2f} 秒") print(f"📊 生成token数量: {token_count}") if first_token_time and token_count > 0: tokens_per_second = token_count / generation_time print(f"🔥 生成速度: {tokens_per_second:.2f} tokens/秒") print(f"⏰ 完成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print(f"输出总费用:{token_count*0.0045/1000:.4f}元") final_cost = token_stats['total_cost']+token_count*0.0045/1000 print(f"总费用:{final_cost:.4f}元") # 输出使用情况信息 if usage_info: print("\n" + "="*50) print("📈 使用情况:") print("="*50) print(usage_info) else: # 非流式输出 - 直接输出完整响应 api_end_time = time.time() total_duration = api_end_time - api_start_time print("非流式输出模式:") print("完整回复:") print("="*50) print(completion.choices[0].message.content) # 保存结果为TXT文件 txt_file_path = save_result_to_txt(completion.choices[0].message.content, video_path) # 输出时间统计信息 print("\n" + "="*50) print("⏱️ 时间统计:") print("="*50) print(f"📁 文件编码时间: {encode_duration:.2f} 秒") print(f"🕐 API总响应时间: {total_duration:.2f} 秒") print(f"⏰ 完成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") # 输出使用情况信息 if hasattr(completion, 'usage') and completion.usage: print("\n" + "="*50) print("📈 使用情况:") print("="*50) print(completion.usage)