#!/usr/bin/env python3 from openai import OpenAI import os import base64 import time import psutil import subprocess from datetime import datetime class MemoryMonitor: def __init__(self): self.checkpoints = [] self.initial_memory = self.get_memory_info() def get_memory_info(self): """获取当前内存使用情况""" memory = psutil.virtual_memory() gpu_info = self.get_gpu_memory() process = psutil.Process() memory_info = process.memory_info() return { "timestamp": datetime.now().isoformat(), "system_memory_gb": memory.used / 1024**3, "system_memory_percent": memory.percent, "gpu_memory": gpu_info, "process_memory_mb": memory_info.rss / 1024 / 1024 } def get_gpu_memory(self): """获取GPU内存使用情况""" try: result = subprocess.run(['nvidia-smi', '--query-gpu=memory.total,memory.used,memory.free', '--format=csv,noheader,nounits'], capture_output=True, text=True, check=True) lines = result.stdout.strip().split('\n') gpu_info = [] for i, line in enumerate(lines): parts = line.split(', ') if len(parts) == 3: total, used, free = map(int, parts) gpu_info.append({ "gpu_id": i, "total_mb": total, "used_mb": used, "free_mb": free, "usage_percent": round(used / total * 100, 2) }) return gpu_info except: return [] def checkpoint(self, name=""): """创建内存检查点""" current_memory = self.get_memory_info() if self.checkpoints: last_memory = self.checkpoints[-1]["memory"] memory_diff = { "system_memory_gb": current_memory["system_memory_gb"] - last_memory["system_memory_gb"], "process_memory_mb": current_memory["process_memory_mb"] - last_memory["process_memory_mb"], } # GPU内存差异 gpu_diff = [] if current_memory["gpu_memory"] and last_memory["gpu_memory"]: for i in range(min(len(current_memory["gpu_memory"]), len(last_memory["gpu_memory"]))): current_gpu = current_memory["gpu_memory"][i]["used_mb"] last_gpu = last_memory["gpu_memory"][i]["used_mb"] gpu_diff.append({ "gpu_id": i, "used_mb_diff": current_gpu - last_gpu }) memory_diff["gpu_memory"] = gpu_diff else: memory_diff = None checkpoint = { "name": name, "memory": current_memory, "memory_diff": memory_diff } self.checkpoints.append(checkpoint) return checkpoint def check_memory_risk(self): """检查内存风险等级""" current = self.get_memory_info() # 系统内存风险 sys_risk = "低" if current["system_memory_percent"] > 90: sys_risk = "高" elif current["system_memory_percent"] > 80: sys_risk = "中" # GPU内存风险 gpu_risk = "低" if current["gpu_memory"]: max_gpu_usage = max(gpu["usage_percent"] for gpu in current["gpu_memory"]) if max_gpu_usage > 95: gpu_risk = "高" elif max_gpu_usage > 85: gpu_risk = "中" return { "system_risk": sys_risk, "gpu_risk": gpu_risk, "current_memory": current } def print_memory_status(self, title=""): """打印当前内存状态""" current = self.get_memory_info() risk = self.check_memory_risk() print(f"\n{'='*50}") print(f"🔍 {title if title else '内存状态检查'}") print(f"{'='*50}") # 系统内存 risk_icon = {"低": "✅", "中": "⚠️", "高": "🚨"}[risk["system_risk"]] print(f"💾 系统内存: {current['system_memory_gb']:.1f} GB ({current['system_memory_percent']:.1f}%) {risk_icon}") # GPU内存 if current["gpu_memory"]: risk_icon = {"低": "✅", "中": "⚠️", "高": "🚨"}[risk["gpu_risk"]] for gpu in current["gpu_memory"]: print(f"🎮 GPU {gpu['gpu_id']}: {gpu['used_mb']:.0f}/{gpu['total_mb']:.0f} MB ({gpu['usage_percent']:.1f}%) {risk_icon}") # 进程内存 print(f"🔧 当前进程: {current['process_memory_mb']:.1f} MB") return risk def analyze_file_sizes(video_path, audio_path=None, txt_content=""): """分析文件大小和预估内存占用""" print(f"\n{'='*50}") print("📊 文件大小分析") print(f"{'='*50}") total_estimated_mb = 0 warnings = [] # 视频文件分析 if os.path.exists(video_path): video_size = os.path.getsize(video_path) video_size_mb = video_size / 1024 / 1024 base64_size_mb = video_size_mb * 1.33 # Base64编码增加约33% memory_estimate_mb = base64_size_mb * 2 # 编码过程需要双倍内存 print(f"🎥 视频文件: {os.path.basename(video_path)}") print(f" 原始大小: {video_size_mb:.2f} MB") print(f" Base64后: {base64_size_mb:.2f} MB") print(f" 内存估算: {memory_estimate_mb:.2f} MB") total_estimated_mb += memory_estimate_mb if base64_size_mb > 100: warnings.append("视频文件过大(>100MB Base64)") elif base64_size_mb > 50: warnings.append("视频文件较大(>50MB Base64)") # 音频文件分析 if audio_path and os.path.exists(audio_path): audio_size = os.path.getsize(audio_path) audio_size_mb = audio_size / 1024 / 1024 base64_size_mb = audio_size_mb * 1.33 memory_estimate_mb = base64_size_mb * 2 print(f"\n🎵 音频文件: {os.path.basename(audio_path)}") print(f" 原始大小: {audio_size_mb:.2f} MB") print(f" Base64后: {base64_size_mb:.2f} MB") print(f" 内存估算: {memory_estimate_mb:.2f} MB") total_estimated_mb += memory_estimate_mb if base64_size_mb > 50: warnings.append("音频文件过大(>50MB Base64)") # 文本内容分析 if txt_content: text_size_mb = len(txt_content.encode('utf-8')) / 1024 / 1024 print(f"\n📝 文本内容: {len(txt_content)} 字符 ({text_size_mb:.3f} MB)") total_estimated_mb += text_size_mb if len(txt_content) > 50000: warnings.append("文本内容过长(>50k字符)") print(f"\n📋 总估算内存: {total_estimated_mb:.2f} MB") # 风险评估 if total_estimated_mb > 500: print("🚨 高风险: 内容过大,强烈建议压缩或分段处理") warnings.append("总内存占用过高(>500MB)") elif total_estimated_mb > 200: print("⚠️ 中风险: 建议监控内存使用") warnings.append("总内存占用较高(>200MB)") else: print("✅ 低风险: 内存占用在可接受范围内") return total_estimated_mb, warnings # Base64 编码格式 def encode_video(video_path): with open(video_path, "rb") as video_file: return base64.b64encode(video_file.read()).decode("utf-8") def encode_audio(audio_path): with open(audio_path, "rb") as audio_file: return base64.b64encode(audio_file.read()).decode("utf-8") def read_txt_file(txt_path): """读取txt文件内容""" try: with open(txt_path, 'r', encoding='utf-8') as file: content = file.read() print(f"成功读取txt文件: {txt_path}") print(f"文件内容长度: {len(content)} 字符") return content except FileNotFoundError: print(f"错误: 找不到文件 {txt_path}") return "" except Exception as e: print(f"读取文件时出错: {e}") return "" def save_result_to_txt(response_text, video_path, save_dir="results"): """将分析结果保存为TXT文件""" os.makedirs(save_dir, exist_ok=True) video_name = os.path.splitext(os.path.basename(video_path))[0] timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") txt_filename = f"{video_name}_analysis_{timestamp}.txt" txt_path = os.path.join(save_dir, txt_filename) content = f"""视频分析结果 ===================================== 视频文件: {video_path} 分析时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ===================================== {response_text} """ try: with open(txt_path, 'w', encoding='utf-8') as f: f.write(content) print(f"\n✅ 分析结果已保存到: {txt_path}") return txt_path except Exception as e: print(f"\n❌ 保存TXT文件失败: {e}") return None # 初始化内存监控器 monitor = MemoryMonitor() STREAM_MODE = True # 文件路径配置 video_path = "/root/autodl-tmp/video2audio/sample_demo_6.mp4" audio_path = "/root/autodl-tmp/video2audio/sample_demo_6.wav" #txt_path = "/root/autodl-tmp/hot_video_analyse/source/example_reference.txt" # 初始内存检查 monitor.checkpoint("程序启动") monitor.print_memory_status("程序启动时内存状态") # 分析文件大小和预估内存占用 txt_content = "" estimated_memory, warnings = analyze_file_sizes(video_path, txt_content=txt_content) # 如果有警告,询问是否继续 if warnings: print(f"\n⚠️ 发现以下潜在问题:") for warning in warnings: print(f" - {warning}") print(f"\n建议:") print(f" - 使用更小的测试文件") print(f" - 监控内存使用情况") print(f" - 如遇到错误,尝试压缩文件") # 编码前内存检查 monitor.checkpoint("开始编码前") risk_before = monitor.check_memory_risk() if risk_before["system_risk"] == "高" or risk_before["gpu_risk"] == "高": print(f"\n🚨 警告: 当前内存使用率已经很高,继续可能导致内存溢出!") print(f" 系统内存风险: {risk_before['system_risk']}") print(f" GPU内存风险: {risk_before['gpu_risk']}") print("\n开始编码文件...") encode_start_time = time.time() try: base64_video = encode_video(video_path) print(f"✅ 视频编码完成") except Exception as e: print(f"❌ 视频编码失败: {e}") monitor.print_memory_status("编码失败时内存状态") exit(1) base64_audio = encode_audio(audio_path) # 编码后内存检查 monitor.checkpoint("编码完成") encode_end_time = time.time() encode_duration = encode_end_time - encode_start_time print(f"📁 文件编码完成,耗时: {encode_duration:.2f} 秒") # 检查编码后内存变化 last_checkpoint = monitor.checkpoints[-1] if last_checkpoint["memory_diff"]: diff = last_checkpoint["memory_diff"] print(f"📊 编码过程内存变化:") print(f" 进程内存增加: {diff['process_memory_mb']:+.1f} MB") if diff["gpu_memory"]: for gpu_diff in diff["gpu_memory"]: print(f" GPU {gpu_diff['gpu_id']} 内存变化: {gpu_diff['used_mb_diff']:+.0f} MB") client = OpenAI( api_key="EMPTY", base_url="http://localhost:8000/v1", ) # 构建content列表 content_list = [ { "type": "video_url", "video_url": {"url": f"data:video/mp4;base64,{base64_video}"}, "type": "audio_url", "audio_url": {"url": f"data:audio/wav;base64,{base64_audio}"}, } ] # 如果txt文件有内容,添加到content中 if txt_content.strip(): content_list.append({ "type": "text", "text": f"参考文档内容:\n{txt_content}\n\n" }) # 添加主要提示文本(简化版以减少内存使用) content_list.append({ "type": "text", "text": """请分析这个抖音短视频的内容: 1. **口播内容**:转录视频中的语音内容 2. **字幕文字**:识别画面中的文字和字幕 3. **勾子分析**:分析视频的开头勾子策略 请用JSON格式输出结果: { "口播分析": {"是否有口播": "", "口播内容": "", "讲话时长": ""}, "字幕分析": {"是否有字幕": "", "字幕内容": "", "字幕位置": ""}, "勾子分析": {"勾子类型": "", "勾子公式": "", "勾子内容": ""} }""" }) # API请求前内存检查 monitor.checkpoint("API请求前") monitor.print_memory_status("API请求前内存状态") print(f"\n🚀 开始请求API...") print(f"📅 请求时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print(f"🔄 Stream模式: {STREAM_MODE}") print(f"📋 Content项目数量: {len(content_list)}") # 计算请求大小 total_request_size = sum(len(str(content)) for content in content_list) print(f"📏 请求总大小: {total_request_size/1024/1024:.2f} MB") api_start_time = time.time() try: completion = client.chat.completions.create( model="/root/autodl-tmp/llm/Qwen-omni", messages=[ { "role": "system", "content": [{"type":"text","text": "You are a helpful assistant."}] }, { "role": "user", "content": content_list } ], stream=STREAM_MODE, stream_options={"include_usage": True} if STREAM_MODE else None, max_tokens=1024, # 限制输出长度以节省内存 ) if STREAM_MODE: full_response = "" usage_info = None first_token_time = None token_count = 0 print("✨ 正在生成回复...") for chunk in completion: if chunk.choices: delta = chunk.choices[0].delta if delta.content: if first_token_time is None: first_token_time = time.time() first_token_delay = first_token_time - api_start_time print(f"🚀 首个token延迟: {first_token_delay:.2f} 秒") full_response += delta.content token_count += 1 else: usage_info = chunk.usage api_end_time = time.time() total_duration = api_end_time - api_start_time print("\n" + "="*50) print("📝 完整回复:") print("="*50) print(full_response) # 保存结果为TXT文件 txt_file_path = save_result_to_txt(full_response, video_path) # API完成后内存检查 monitor.checkpoint("API完成") # 输出时间统计信息 print("\n" + "="*50) print("⏱️ 时间统计:") print("="*50) print(f"📁 文件编码时间: {encode_duration:.2f} 秒") if first_token_time: print(f"🚀 首个token延迟: {first_token_delay:.2f} 秒") generation_time = api_end_time - first_token_time print(f"⚡ 内容生成时间: {generation_time:.2f} 秒") print(f"🕐 API总响应时间: {total_duration:.2f} 秒") print(f"📊 生成token数量: {token_count}") if first_token_time and token_count > 0: tokens_per_second = token_count / generation_time print(f"🔥 生成速度: {tokens_per_second:.2f} tokens/秒") print(f"⏰ 完成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") if usage_info: print(f"\n📈 使用情况: {usage_info}") except Exception as e: print(f"\n❌ API请求失败!") print(f"错误类型: {type(e)}") print(f"错误信息: {e}") # 错误时进行内存检查 monitor.checkpoint("API错误") monitor.print_memory_status("API错误时内存状态") # 分析可能的原因 if "Internal Server Error" in str(e) or "OutOfMemoryError" in str(e): print(f"\n💡 可能的内存溢出原因:") print(f" - 视频文件过大 ({estimated_memory:.1f} MB)") print(f" - GPU内存不足") print(f" - 系统内存不足") print(f"\n建议解决方案:") print(f" - 使用更小的视频文件") print(f" - 重启vLLM服务释放GPU内存") print(f" - 降低max_tokens限制") # 最终内存状态报告 print(f"\n{'='*60}") print("📊 最终内存使用报告") print(f"{'='*60}") for i, checkpoint in enumerate(monitor.checkpoints): print(f"{i+1}. {checkpoint['name']}") if checkpoint['memory_diff']: diff = checkpoint['memory_diff'] if abs(diff['process_memory_mb']) > 10: # 只显示显著变化 print(f" 进程内存变化: {diff['process_memory_mb']:+.1f} MB") if diff['gpu_memory']: for gpu_diff in diff['gpu_memory']: if abs(gpu_diff['used_mb_diff']) > 50: # 只显示显著变化 print(f" GPU {gpu_diff['gpu_id']} 变化: {gpu_diff['used_mb_diff']:+.0f} MB") monitor.print_memory_status("程序结束时内存状态")