hot_video_analyse/code/video_to_text.py

267 lines
8.9 KiB
Python
Raw Permalink Normal View History

import os
import subprocess
import whisper
import torch
from moviepy import VideoFileClip
import warnings
warnings.filterwarnings("ignore")
class VideoToTextConverter:
def __init__(self, model_size="base"):
"""
初始化视频转文字转换器
Args:
model_size: Whisper模型大小 ("tiny", "base", "small", "medium", "large")
"""
self.model_size = model_size
self.model = None
self.load_model()
def load_model(self):
"""加载Whisper模型"""
try:
print(f"正在加载Whisper {self.model_size} 模型...")
self.model = whisper.load_model(self.model_size)
print("模型加载成功!")
except Exception as e:
print(f"模型加载失败: {e}")
print("尝试使用CPU模式...")
self.model = whisper.load_model(self.model_size, device="cpu")
def extract_audio_moviepy(self, video_path, audio_path):
"""
使用moviepy提取音频
Args:
video_path: 视频文件路径
audio_path: 输出音频文件路径
"""
try:
print("正在使用moviepy提取音频...")
video = VideoFileClip(video_path)
audio = video.audio
audio.write_audiofile(audio_path, verbose=False, logger=None)
audio.close()
video.close()
print(f"音频提取成功: {audio_path}")
return True
except Exception as e:
print(f"moviepy提取音频失败: {e}")
return False
def extract_audio_ffmpeg(self, video_path, audio_path):
"""
使用ffmpeg提取音频
Args:
video_path: 视频文件路径
audio_path: 输出音频文件路径
"""
try:
print("正在使用ffmpeg提取音频...")
cmd = [
'ffmpeg', '-i', video_path,
'-vn', # 不包含视频
'-acodec', 'pcm_s16le', # 音频编码
'-ar', '16000', # 采样率
'-ac', '1', # 单声道
'-y', # 覆盖输出文件
audio_path
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
print(f"音频提取成功: {audio_path}")
return True
else:
print(f"ffmpeg错误: {result.stderr}")
return False
except Exception as e:
print(f"ffmpeg提取音频失败: {e}")
return False
def extract_audio(self, video_path, audio_path=None):
"""
提取视频中的音频
Args:
video_path: 视频文件路径
audio_path: 输出音频文件路径如果为None则自动生成
Returns:
audio_path: 成功提取的音频文件路径失败返回None
"""
if audio_path is None:
# 自动生成音频文件路径
video_dir = os.path.dirname(video_path)
video_name = os.path.splitext(os.path.basename(video_path))[0]
audio_path = os.path.join(video_dir, f"{video_name}_audio.wav")
# 确保输出目录存在
os.makedirs(os.path.dirname(audio_path), exist_ok=True)
# 首先尝试使用moviepy
if self.extract_audio_moviepy(video_path, audio_path):
return audio_path
# 如果moviepy失败尝试使用ffmpeg
if self.extract_audio_ffmpeg(video_path, audio_path):
return audio_path
print("所有音频提取方法都失败了")
return None
def transcribe_audio(self, audio_path, language="zh"):
"""
将音频转换为文字
Args:
audio_path: 音频文件路径
language: 语言代码 ("zh"中文, "en"英文, None自动检测)
Returns:
dict: 包含转录结果的字典
"""
if self.model is None:
print("模型未加载,无法进行转录")
return None
try:
print("正在进行语音识别...")
# 设置转录选项
options = {
"language": language if language != "auto" else None,
"task": "transcribe",
"fp16": torch.cuda.is_available() # 如果有GPU则使用半精度
}
result = self.model.transcribe(audio_path, **options)
print("语音识别完成!")
return result
except Exception as e:
print(f"语音识别失败: {e}")
return None
def video_to_text(self, video_path, output_dir=None, language="zh", save_audio=True):
"""
完整的视频转文字流程
Args:
video_path: 视频文件路径
output_dir: 输出目录如果为None则使用视频所在目录
language: 语言代码
save_audio: 是否保存提取的音频文件
Returns:
dict: 包含转录结果和文件路径的字典
"""
if output_dir is None:
output_dir = os.path.dirname(video_path)
video_name = os.path.splitext(os.path.basename(video_path))[0]
# 提取音频
audio_path = os.path.join(output_dir, f"{video_name}_audio.wav")
extracted_audio = self.extract_audio(video_path, audio_path)
if extracted_audio is None:
return {"success": False, "error": "音频提取失败"}
# 转录音频
transcription = self.transcribe_audio(extracted_audio, language)
if transcription is None:
return {"success": False, "error": "语音识别失败"}
# 保存转录结果
text_path = os.path.join(output_dir, f"{video_name}_transcription.txt")
with open(text_path, 'w', encoding='utf-8') as f:
f.write(transcription["text"])
# 保存详细结果(包含时间戳)
detailed_path = os.path.join(output_dir, f"{video_name}_detailed.txt")
with open(detailed_path, 'w', encoding='utf-8') as f:
for segment in transcription["segments"]:
start_time = segment["start"]
end_time = segment["end"]
text = segment["text"]
f.write(f"[{start_time:.2f}s - {end_time:.2f}s]: {text}\n")
# 如果不需要保存音频文件,则删除
if not save_audio and os.path.exists(extracted_audio):
os.remove(extracted_audio)
extracted_audio = None
result = {
"success": True,
"text": transcription["text"],
"segments": transcription["segments"],
"language": transcription["language"],
"audio_path": extracted_audio,
"text_path": text_path,
"detailed_path": detailed_path
}
return result
def print_result(self, result):
"""打印转录结果"""
if not result["success"]:
print(f"转录失败: {result['error']}")
return
print("\n" + "="*50)
print("视频转文字结果")
print("="*50)
print(f"检测到的语言: {result['language']}")
print(f"文本文件: {result['text_path']}")
print(f"详细文件: {result['detailed_path']}")
if result['audio_path']:
print(f"音频文件: {result['audio_path']}")
print("\n完整文本:")
print("-" * 30)
print(result["text"])
print("\n分段文本 (前5段):")
print("-" * 30)
for i, segment in enumerate(result["segments"][:5]):
start_time = segment["start"]
end_time = segment["end"]
text = segment["text"]
print(f"[{start_time:.2f}s - {end_time:.2f}s]: {text}")
if len(result["segments"]) > 5:
print(f"... 还有 {len(result['segments']) - 5} 个分段")
def main():
"""示例用法"""
# 初始化转换器
converter = VideoToTextConverter(model_size="base")
# 视频文件路径
video_path = "/root/autodl-tmp/hot_video_analyse/source/sample_demo_1.mp4"
output_dir = "/root/autodl-tmp/hot_video_analyse/source/transcription"
# 确保输出目录存在
os.makedirs(output_dir, exist_ok=True)
# 转换视频为文字
result = converter.video_to_text(
video_path=video_path,
output_dir=output_dir,
language="auto", # 中文,也可以用"en"英文或"auto"自动检测
save_audio=True
)
# 打印结果
converter.print_result(result)
if __name__ == "__main__":
main()