#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 简单的批量Whisper转录脚本 直接使用原始的whisper_audio_transcribe.py """ import os import sys import subprocess from pathlib import Path import argparse import logging # 设置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def find_audio_dirs(video_processed_dir): """查找所有包含audio_split的目录""" video_processed_path = Path(video_processed_dir) audio_dirs = [] for video_dir in video_processed_path.iterdir(): if not video_dir.is_dir(): continue audio_split_dir = video_dir / "audio_split" if audio_split_dir.exists(): audio_dirs.append({ 'video_dir': video_dir, 'audio_dir': audio_split_dir, 'whisper_dir': video_dir / "whisper" }) return audio_dirs def process_single_video(video_info, model_size="base", language="zh", task="transcribe"): """处理单个视频目录""" video_dir = video_info['video_dir'] audio_dir = video_info['audio_dir'] whisper_dir = video_info['whisper_dir'] logger.info(f"处理视频目录: {video_dir.name}") # 创建whisper目录 whisper_dir.mkdir(exist_ok=True) # 调用原始的whisper_audio_transcribe.py cmd = [ sys.executable, "code/whisper_audio_transcribe.py", str(audio_dir), "-m", model_size, "-l", language, "-t", task, "-o", str(whisper_dir) ] try: logger.info(f"执行命令: {' '.join(cmd)}") result = subprocess.run(cmd, capture_output=True, text=True, check=True) logger.info(f"✅ {video_dir.name} 处理成功") return True except subprocess.CalledProcessError as e: logger.error(f"❌ {video_dir.name} 处理失败: {e}") logger.error(f"错误输出: {e.stderr}") return False def main(): """主函数""" parser = argparse.ArgumentParser(description="简单的批量Whisper转录") parser.add_argument("input", help="video_processed目录路径") parser.add_argument("-m", "--model", default="medium", choices=["tiny", "base", "small", "medium", "large", "large-v2", "large-v3"], help="Whisper模型大小 (默认: base)") parser.add_argument("-l", "--language", default="zh", help="语言代码 (zh=中文, en=英文, auto=自动检测, 默认: zh)") parser.add_argument("-t", "--task", default="transcribe", choices=["transcribe", "translate"], help="任务类型 (transcribe=转录, translate=翻译为英文, 默认: transcribe)") args = parser.parse_args() # 查找所有音频目录 audio_dirs = find_audio_dirs(args.input) if not audio_dirs: logger.warning(f"在 {args.input} 中未找到包含audio_split的目录") return logger.info(f"找到 {len(audio_dirs)} 个视频目录需要处理") # 处理每个视频目录 success_count = 0 failed_count = 0 for i, video_info in enumerate(audio_dirs, 1): logger.info(f"处理第 {i}/{len(audio_dirs)} 个视频目录") if process_single_video(video_info, args.model, args.language, args.task): success_count += 1 else: failed_count += 1 # 输出结果 print(f"\n✅ 批量Whisper转录完成!") print(f"📁 输入目录: {args.input}") print(f"📊 成功处理: {success_count}/{len(audio_dirs)} 个视频目录") if failed_count > 0: print(f"❌ 失败: {failed_count} 个") print(f"🔍 模型大小: {args.model}") print(f"🌍 语言: {args.language}") print(f"📝 任务: {args.task}") if __name__ == "__main__": main()