import os import subprocess import whisper import torch from moviepy import VideoFileClip import warnings warnings.filterwarnings("ignore") class VideoToTextConverter: def __init__(self, model_size="base"): """ 初始化视频转文字转换器 Args: model_size: Whisper模型大小 ("tiny", "base", "small", "medium", "large") """ self.model_size = model_size self.model = None self.load_model() def load_model(self): """加载Whisper模型""" try: print(f"正在加载Whisper {self.model_size} 模型...") self.model = whisper.load_model(self.model_size) print("模型加载成功!") except Exception as e: print(f"模型加载失败: {e}") print("尝试使用CPU模式...") self.model = whisper.load_model(self.model_size, device="cpu") def extract_audio_moviepy(self, video_path, audio_path): """ 使用moviepy提取音频 Args: video_path: 视频文件路径 audio_path: 输出音频文件路径 """ try: print("正在使用moviepy提取音频...") video = VideoFileClip(video_path) audio = video.audio audio.write_audiofile(audio_path, verbose=False, logger=None) audio.close() video.close() print(f"音频提取成功: {audio_path}") return True except Exception as e: print(f"moviepy提取音频失败: {e}") return False def extract_audio_ffmpeg(self, video_path, audio_path): """ 使用ffmpeg提取音频 Args: video_path: 视频文件路径 audio_path: 输出音频文件路径 """ try: print("正在使用ffmpeg提取音频...") cmd = [ 'ffmpeg', '-i', video_path, '-vn', # 不包含视频 '-acodec', 'pcm_s16le', # 音频编码 '-ar', '16000', # 采样率 '-ac', '1', # 单声道 '-y', # 覆盖输出文件 audio_path ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode == 0: print(f"音频提取成功: {audio_path}") return True else: print(f"ffmpeg错误: {result.stderr}") return False except Exception as e: print(f"ffmpeg提取音频失败: {e}") return False def extract_audio(self, video_path, audio_path=None): """ 提取视频中的音频 Args: video_path: 视频文件路径 audio_path: 输出音频文件路径,如果为None则自动生成 Returns: audio_path: 成功提取的音频文件路径,失败返回None """ if audio_path is None: # 自动生成音频文件路径 video_dir = os.path.dirname(video_path) video_name = os.path.splitext(os.path.basename(video_path))[0] audio_path = os.path.join(video_dir, f"{video_name}_audio.wav") # 确保输出目录存在 os.makedirs(os.path.dirname(audio_path), exist_ok=True) # 首先尝试使用moviepy if self.extract_audio_moviepy(video_path, audio_path): return audio_path # 如果moviepy失败,尝试使用ffmpeg if self.extract_audio_ffmpeg(video_path, audio_path): return audio_path print("所有音频提取方法都失败了") return None def transcribe_audio(self, audio_path, language="zh"): """ 将音频转换为文字 Args: audio_path: 音频文件路径 language: 语言代码 ("zh"中文, "en"英文, None自动检测) Returns: dict: 包含转录结果的字典 """ if self.model is None: print("模型未加载,无法进行转录") return None try: print("正在进行语音识别...") # 设置转录选项 options = { "language": language if language != "auto" else None, "task": "transcribe", "fp16": torch.cuda.is_available() # 如果有GPU则使用半精度 } result = self.model.transcribe(audio_path, **options) print("语音识别完成!") return result except Exception as e: print(f"语音识别失败: {e}") return None def video_to_text(self, video_path, output_dir=None, language="zh", save_audio=True): """ 完整的视频转文字流程 Args: video_path: 视频文件路径 output_dir: 输出目录,如果为None则使用视频所在目录 language: 语言代码 save_audio: 是否保存提取的音频文件 Returns: dict: 包含转录结果和文件路径的字典 """ if output_dir is None: output_dir = os.path.dirname(video_path) video_name = os.path.splitext(os.path.basename(video_path))[0] # 提取音频 audio_path = os.path.join(output_dir, f"{video_name}_audio.wav") extracted_audio = self.extract_audio(video_path, audio_path) if extracted_audio is None: return {"success": False, "error": "音频提取失败"} # 转录音频 transcription = self.transcribe_audio(extracted_audio, language) if transcription is None: return {"success": False, "error": "语音识别失败"} # 保存转录结果 text_path = os.path.join(output_dir, f"{video_name}_transcription.txt") with open(text_path, 'w', encoding='utf-8') as f: f.write(transcription["text"]) # 保存详细结果(包含时间戳) detailed_path = os.path.join(output_dir, f"{video_name}_detailed.txt") with open(detailed_path, 'w', encoding='utf-8') as f: for segment in transcription["segments"]: start_time = segment["start"] end_time = segment["end"] text = segment["text"] f.write(f"[{start_time:.2f}s - {end_time:.2f}s]: {text}\n") # 如果不需要保存音频文件,则删除 if not save_audio and os.path.exists(extracted_audio): os.remove(extracted_audio) extracted_audio = None result = { "success": True, "text": transcription["text"], "segments": transcription["segments"], "language": transcription["language"], "audio_path": extracted_audio, "text_path": text_path, "detailed_path": detailed_path } return result def print_result(self, result): """打印转录结果""" if not result["success"]: print(f"转录失败: {result['error']}") return print("\n" + "="*50) print("视频转文字结果") print("="*50) print(f"检测到的语言: {result['language']}") print(f"文本文件: {result['text_path']}") print(f"详细文件: {result['detailed_path']}") if result['audio_path']: print(f"音频文件: {result['audio_path']}") print("\n完整文本:") print("-" * 30) print(result["text"]) print("\n分段文本 (前5段):") print("-" * 30) for i, segment in enumerate(result["segments"][:5]): start_time = segment["start"] end_time = segment["end"] text = segment["text"] print(f"[{start_time:.2f}s - {end_time:.2f}s]: {text}") if len(result["segments"]) > 5: print(f"... 还有 {len(result['segments']) - 5} 个分段") def main(): """示例用法""" # 初始化转换器 converter = VideoToTextConverter(model_size="base") # 视频文件路径 video_path = "/root/autodl-tmp/hot_video_analyse/source/sample_demo_1.mp4" output_dir = "/root/autodl-tmp/hot_video_analyse/source/transcription" # 确保输出目录存在 os.makedirs(output_dir, exist_ok=True) # 转换视频为文字 result = converter.video_to_text( video_path=video_path, output_dir=output_dir, language="auto", # 中文,也可以用"en"英文或"auto"自动检测 save_audio=True ) # 打印结果 converter.print_result(result) if __name__ == "__main__": main()