267 lines
8.9 KiB
Python
267 lines
8.9 KiB
Python
import os
|
||
import subprocess
|
||
import whisper
|
||
import torch
|
||
from moviepy import VideoFileClip
|
||
import warnings
|
||
warnings.filterwarnings("ignore")
|
||
|
||
class VideoToTextConverter:
|
||
def __init__(self, model_size="base"):
|
||
"""
|
||
初始化视频转文字转换器
|
||
|
||
Args:
|
||
model_size: Whisper模型大小 ("tiny", "base", "small", "medium", "large")
|
||
"""
|
||
self.model_size = model_size
|
||
self.model = None
|
||
self.load_model()
|
||
|
||
def load_model(self):
|
||
"""加载Whisper模型"""
|
||
try:
|
||
print(f"正在加载Whisper {self.model_size} 模型...")
|
||
self.model = whisper.load_model(self.model_size)
|
||
print("模型加载成功!")
|
||
except Exception as e:
|
||
print(f"模型加载失败: {e}")
|
||
print("尝试使用CPU模式...")
|
||
self.model = whisper.load_model(self.model_size, device="cpu")
|
||
|
||
def extract_audio_moviepy(self, video_path, audio_path):
|
||
"""
|
||
使用moviepy提取音频
|
||
|
||
Args:
|
||
video_path: 视频文件路径
|
||
audio_path: 输出音频文件路径
|
||
"""
|
||
try:
|
||
print("正在使用moviepy提取音频...")
|
||
video = VideoFileClip(video_path)
|
||
audio = video.audio
|
||
audio.write_audiofile(audio_path, verbose=False, logger=None)
|
||
audio.close()
|
||
video.close()
|
||
print(f"音频提取成功: {audio_path}")
|
||
return True
|
||
except Exception as e:
|
||
print(f"moviepy提取音频失败: {e}")
|
||
return False
|
||
|
||
def extract_audio_ffmpeg(self, video_path, audio_path):
|
||
"""
|
||
使用ffmpeg提取音频
|
||
|
||
Args:
|
||
video_path: 视频文件路径
|
||
audio_path: 输出音频文件路径
|
||
"""
|
||
try:
|
||
print("正在使用ffmpeg提取音频...")
|
||
cmd = [
|
||
'ffmpeg', '-i', video_path,
|
||
'-vn', # 不包含视频
|
||
'-acodec', 'pcm_s16le', # 音频编码
|
||
'-ar', '16000', # 采样率
|
||
'-ac', '1', # 单声道
|
||
'-y', # 覆盖输出文件
|
||
audio_path
|
||
]
|
||
|
||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||
if result.returncode == 0:
|
||
print(f"音频提取成功: {audio_path}")
|
||
return True
|
||
else:
|
||
print(f"ffmpeg错误: {result.stderr}")
|
||
return False
|
||
except Exception as e:
|
||
print(f"ffmpeg提取音频失败: {e}")
|
||
return False
|
||
|
||
def extract_audio(self, video_path, audio_path=None):
|
||
"""
|
||
提取视频中的音频
|
||
|
||
Args:
|
||
video_path: 视频文件路径
|
||
audio_path: 输出音频文件路径,如果为None则自动生成
|
||
|
||
Returns:
|
||
audio_path: 成功提取的音频文件路径,失败返回None
|
||
"""
|
||
if audio_path is None:
|
||
# 自动生成音频文件路径
|
||
video_dir = os.path.dirname(video_path)
|
||
video_name = os.path.splitext(os.path.basename(video_path))[0]
|
||
audio_path = os.path.join(video_dir, f"{video_name}_audio.wav")
|
||
|
||
# 确保输出目录存在
|
||
os.makedirs(os.path.dirname(audio_path), exist_ok=True)
|
||
|
||
# 首先尝试使用moviepy
|
||
if self.extract_audio_moviepy(video_path, audio_path):
|
||
return audio_path
|
||
|
||
# 如果moviepy失败,尝试使用ffmpeg
|
||
if self.extract_audio_ffmpeg(video_path, audio_path):
|
||
return audio_path
|
||
|
||
print("所有音频提取方法都失败了")
|
||
return None
|
||
|
||
def transcribe_audio(self, audio_path, language="zh"):
|
||
"""
|
||
将音频转换为文字
|
||
|
||
Args:
|
||
audio_path: 音频文件路径
|
||
language: 语言代码 ("zh"中文, "en"英文, None自动检测)
|
||
|
||
Returns:
|
||
dict: 包含转录结果的字典
|
||
"""
|
||
if self.model is None:
|
||
print("模型未加载,无法进行转录")
|
||
return None
|
||
|
||
try:
|
||
print("正在进行语音识别...")
|
||
|
||
# 设置转录选项
|
||
options = {
|
||
"language": language if language != "auto" else None,
|
||
"task": "transcribe",
|
||
"fp16": torch.cuda.is_available() # 如果有GPU则使用半精度
|
||
}
|
||
|
||
result = self.model.transcribe(audio_path, **options)
|
||
|
||
print("语音识别完成!")
|
||
return result
|
||
|
||
except Exception as e:
|
||
print(f"语音识别失败: {e}")
|
||
return None
|
||
|
||
def video_to_text(self, video_path, output_dir=None, language="zh", save_audio=True):
|
||
"""
|
||
完整的视频转文字流程
|
||
|
||
Args:
|
||
video_path: 视频文件路径
|
||
output_dir: 输出目录,如果为None则使用视频所在目录
|
||
language: 语言代码
|
||
save_audio: 是否保存提取的音频文件
|
||
|
||
Returns:
|
||
dict: 包含转录结果和文件路径的字典
|
||
"""
|
||
if output_dir is None:
|
||
output_dir = os.path.dirname(video_path)
|
||
|
||
video_name = os.path.splitext(os.path.basename(video_path))[0]
|
||
|
||
# 提取音频
|
||
audio_path = os.path.join(output_dir, f"{video_name}_audio.wav")
|
||
extracted_audio = self.extract_audio(video_path, audio_path)
|
||
|
||
if extracted_audio is None:
|
||
return {"success": False, "error": "音频提取失败"}
|
||
|
||
# 转录音频
|
||
transcription = self.transcribe_audio(extracted_audio, language)
|
||
|
||
if transcription is None:
|
||
return {"success": False, "error": "语音识别失败"}
|
||
|
||
# 保存转录结果
|
||
text_path = os.path.join(output_dir, f"{video_name}_transcription.txt")
|
||
with open(text_path, 'w', encoding='utf-8') as f:
|
||
f.write(transcription["text"])
|
||
|
||
# 保存详细结果(包含时间戳)
|
||
detailed_path = os.path.join(output_dir, f"{video_name}_detailed.txt")
|
||
with open(detailed_path, 'w', encoding='utf-8') as f:
|
||
for segment in transcription["segments"]:
|
||
start_time = segment["start"]
|
||
end_time = segment["end"]
|
||
text = segment["text"]
|
||
f.write(f"[{start_time:.2f}s - {end_time:.2f}s]: {text}\n")
|
||
|
||
# 如果不需要保存音频文件,则删除
|
||
if not save_audio and os.path.exists(extracted_audio):
|
||
os.remove(extracted_audio)
|
||
extracted_audio = None
|
||
|
||
result = {
|
||
"success": True,
|
||
"text": transcription["text"],
|
||
"segments": transcription["segments"],
|
||
"language": transcription["language"],
|
||
"audio_path": extracted_audio,
|
||
"text_path": text_path,
|
||
"detailed_path": detailed_path
|
||
}
|
||
|
||
return result
|
||
|
||
def print_result(self, result):
|
||
"""打印转录结果"""
|
||
if not result["success"]:
|
||
print(f"转录失败: {result['error']}")
|
||
return
|
||
|
||
print("\n" + "="*50)
|
||
print("视频转文字结果")
|
||
print("="*50)
|
||
print(f"检测到的语言: {result['language']}")
|
||
print(f"文本文件: {result['text_path']}")
|
||
print(f"详细文件: {result['detailed_path']}")
|
||
if result['audio_path']:
|
||
print(f"音频文件: {result['audio_path']}")
|
||
|
||
print("\n完整文本:")
|
||
print("-" * 30)
|
||
print(result["text"])
|
||
|
||
print("\n分段文本 (前5段):")
|
||
print("-" * 30)
|
||
for i, segment in enumerate(result["segments"][:5]):
|
||
start_time = segment["start"]
|
||
end_time = segment["end"]
|
||
text = segment["text"]
|
||
print(f"[{start_time:.2f}s - {end_time:.2f}s]: {text}")
|
||
|
||
if len(result["segments"]) > 5:
|
||
print(f"... 还有 {len(result['segments']) - 5} 个分段")
|
||
|
||
|
||
def main():
|
||
"""示例用法"""
|
||
# 初始化转换器
|
||
converter = VideoToTextConverter(model_size="base")
|
||
|
||
# 视频文件路径
|
||
video_path = "/root/autodl-tmp/hot_video_analyse/source/sample_demo_1.mp4"
|
||
output_dir = "/root/autodl-tmp/hot_video_analyse/source/transcription"
|
||
|
||
# 确保输出目录存在
|
||
os.makedirs(output_dir, exist_ok=True)
|
||
|
||
# 转换视频为文字
|
||
result = converter.video_to_text(
|
||
video_path=video_path,
|
||
output_dir=output_dir,
|
||
language="auto", # 中文,也可以用"en"英文或"auto"自动检测
|
||
save_audio=True
|
||
)
|
||
|
||
# 打印结果
|
||
converter.print_result(result)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main() |