323 lines
12 KiB
Python
323 lines
12 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
视频处理整合工具
|
||
先进行视频切割,再转换为音频
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import argparse
|
||
import subprocess
|
||
from pathlib import Path
|
||
import logging
|
||
from typing import List, Tuple
|
||
|
||
# 导入自定义模块
|
||
from video_splitter import get_video_files, split_video, batch_split_videos
|
||
from video2audio import Video2AudioExtractor
|
||
|
||
# 设置日志
|
||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||
logger = logging.getLogger(__name__)
|
||
|
||
class VideoProcessor:
|
||
"""视频处理整合类"""
|
||
|
||
def __init__(self, input_dir: str, output_dir: str, segment_duration: int = 30, audio_format: str = "wav"):
|
||
"""
|
||
初始化视频处理器
|
||
|
||
Args:
|
||
input_dir: 输入视频目录
|
||
output_dir: 输出目录
|
||
segment_duration: 每个片段的时长(秒)
|
||
audio_format: 音频格式
|
||
"""
|
||
self.input_dir = Path(input_dir)
|
||
self.output_dir = Path(output_dir)
|
||
self.segment_duration = segment_duration
|
||
self.audio_format = audio_format
|
||
|
||
# 创建主输出目录
|
||
self.output_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
logger.info(f"视频处理器初始化完成")
|
||
logger.info(f"输入目录: {self.input_dir}")
|
||
logger.info(f"输出目录: {self.output_dir}")
|
||
|
||
def check_dependencies(self) -> bool:
|
||
"""检查依赖是否可用"""
|
||
try:
|
||
# 检查ffmpeg
|
||
subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True)
|
||
logger.info("✅ ffmpeg 可用")
|
||
|
||
# 检查Python依赖
|
||
import decord
|
||
import numpy
|
||
import tqdm
|
||
import soundfile
|
||
logger.info("✅ Python依赖包可用")
|
||
|
||
return True
|
||
|
||
except (subprocess.CalledProcessError, FileNotFoundError):
|
||
logger.error("❌ ffmpeg 未安装或不可用")
|
||
logger.info("请安装 ffmpeg: sudo apt install ffmpeg")
|
||
return False
|
||
except ImportError as e:
|
||
logger.error(f"❌ Python依赖包缺失: {e}")
|
||
logger.info("请安装依赖: pip install -r requirements.txt")
|
||
return False
|
||
|
||
def process_videos(self) -> Tuple[int, int, int]:
|
||
"""
|
||
处理视频:先切割,再提取音频
|
||
|
||
Returns:
|
||
Tuple[int, int, int]: (成功处理的视频数, 切割的片段数, 提取的音频数)
|
||
"""
|
||
logger.info("=" * 60)
|
||
logger.info("开始视频处理流程")
|
||
logger.info("=" * 60)
|
||
|
||
# 检查依赖
|
||
if not self.check_dependencies():
|
||
return 0, 0, 0
|
||
|
||
# 获取视频文件列表
|
||
logger.info("步骤1: 扫描视频文件")
|
||
video_files = get_video_files(str(self.input_dir))
|
||
|
||
if not video_files:
|
||
logger.warning("没有找到视频文件")
|
||
return 0, 0, 0
|
||
|
||
logger.info(f"找到 {len(video_files)} 个视频文件")
|
||
|
||
# 统计信息
|
||
success_videos = 0
|
||
total_segments = 0
|
||
total_audio_files = 0
|
||
|
||
# 处理每个视频
|
||
for i, video_file in enumerate(video_files, 1):
|
||
logger.info(f"\n{'='*40}")
|
||
logger.info(f"处理第 {i}/{len(video_files)} 个视频: {video_file.name}")
|
||
logger.info(f"{'='*40}")
|
||
|
||
try:
|
||
# 步骤2: 切割视频
|
||
logger.info("步骤2: 切割视频")
|
||
if self._split_single_video(video_file):
|
||
success_videos += 1
|
||
|
||
# 步骤3: 提取音频
|
||
logger.info("步骤3: 提取音频")
|
||
segments_count, audio_count = self._extract_audio_from_segments(video_file)
|
||
total_segments += segments_count
|
||
total_audio_files += audio_count
|
||
else:
|
||
logger.error(f"视频 {video_file.name} 处理失败")
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理视频 {video_file.name} 时出错: {e}")
|
||
|
||
# 输出统计信息
|
||
logger.info(f"\n{'='*60}")
|
||
logger.info("处理完成!")
|
||
logger.info(f"{'='*60}")
|
||
logger.info(f"成功处理的视频数: {success_videos}/{len(video_files)}")
|
||
logger.info(f"总切割片段数: {total_segments}")
|
||
logger.info(f"总音频文件数: {total_audio_files}")
|
||
logger.info(f"输出目录: {self.output_dir}")
|
||
|
||
return success_videos, total_segments, total_audio_files
|
||
|
||
def _split_single_video(self, video_file: Path) -> bool:
|
||
"""
|
||
切割单个视频
|
||
|
||
Args:
|
||
video_file: 视频文件路径
|
||
|
||
Returns:
|
||
bool: 是否成功
|
||
"""
|
||
try:
|
||
# 为每个视频创建独立的输出目录结构
|
||
video_name = video_file.stem
|
||
video_output_dir = self.output_dir / video_name
|
||
video_split_dir = video_output_dir / "video_split"
|
||
|
||
# 创建目录
|
||
video_split_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
# 调用split_video,但传入video_split_dir作为输出目录
|
||
success = split_video(video_file, str(video_split_dir), self.segment_duration)
|
||
|
||
# 如果视频不足30秒,split_video会将原视频复制到video_split_dir
|
||
# 如果视频超过30秒,split_video会将片段保存到video_split_dir
|
||
# 两种情况都符合我们的期望
|
||
|
||
return success
|
||
except Exception as e:
|
||
logger.error(f"切割视频失败: {e}")
|
||
return False
|
||
|
||
def _extract_audio_from_segments(self, original_video: Path) -> Tuple[int, int]:
|
||
"""
|
||
从视频片段中提取音频
|
||
|
||
Args:
|
||
original_video: 原始视频文件路径
|
||
|
||
Returns:
|
||
Tuple[int, int]: (片段数, 音频文件数)
|
||
"""
|
||
video_name = original_video.stem
|
||
video_output_dir = self.output_dir / video_name
|
||
video_split_dir = video_output_dir / "video_split"
|
||
audio_split_dir = video_output_dir / "audio_split"
|
||
|
||
if not video_split_dir.exists():
|
||
logger.warning(f"视频片段目录不存在: {video_split_dir}")
|
||
return 0, 0
|
||
|
||
# 获取所有视频片段
|
||
video_extensions = ['.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv', '.webm']
|
||
segment_files = []
|
||
|
||
for ext in video_extensions:
|
||
segment_files.extend(video_split_dir.glob(f"*{ext}"))
|
||
|
||
if not segment_files:
|
||
logger.warning(f"没有找到视频片段: {video_split_dir}")
|
||
return 0, 0
|
||
|
||
logger.info(f"找到 {len(segment_files)} 个视频片段")
|
||
|
||
# 创建音频输出目录
|
||
audio_split_dir.mkdir(exist_ok=True)
|
||
|
||
# 提取音频
|
||
audio_count = 0
|
||
for segment_file in segment_files:
|
||
try:
|
||
logger.info(f"提取音频: {segment_file.name}")
|
||
|
||
# 创建音频提取器,输出到audio_split目录
|
||
segment_audio_extractor = Video2AudioExtractor(str(audio_split_dir))
|
||
video_out, audio_out = segment_audio_extractor.extract_audio_from_video(
|
||
str(segment_file), self.audio_format
|
||
)
|
||
|
||
if audio_out:
|
||
audio_count += 1
|
||
logger.info(f"音频提取成功: {Path(audio_out).name}")
|
||
else:
|
||
logger.warning(f"音频提取失败: {segment_file.name}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"提取音频时出错: {e}")
|
||
|
||
logger.info(f"音频提取完成: {audio_count}/{len(segment_files)} 个片段")
|
||
return len(segment_files), audio_count
|
||
|
||
def get_output_structure(self) -> dict:
|
||
"""
|
||
获取输出目录结构信息
|
||
|
||
Returns:
|
||
dict: 目录结构信息
|
||
"""
|
||
structure = {
|
||
"output_dir": str(self.output_dir),
|
||
"videos": []
|
||
}
|
||
|
||
# 统计每个视频的目录结构
|
||
if self.output_dir.exists():
|
||
for video_dir in self.output_dir.iterdir():
|
||
if video_dir.is_dir():
|
||
video_info = {
|
||
"name": video_dir.name,
|
||
"video_split_count": 0,
|
||
"audio_split_count": 0
|
||
}
|
||
|
||
# 检查video_split目录
|
||
video_split_dir = video_dir / "video_split"
|
||
if video_split_dir.exists():
|
||
segments = list(video_split_dir.glob("*.mp4"))
|
||
video_info["video_split_count"] = len(segments)
|
||
|
||
# 检查audio_split目录
|
||
audio_split_dir = video_dir / "audio_split"
|
||
if audio_split_dir.exists():
|
||
audio_files = list(audio_split_dir.glob(f"*.{self.audio_format}"))
|
||
video_info["audio_split_count"] = len(audio_files)
|
||
|
||
structure["videos"].append(video_info)
|
||
|
||
return structure
|
||
|
||
def main():
|
||
"""主函数"""
|
||
parser = argparse.ArgumentParser(description="视频处理整合工具 - 先切割再提取音频")
|
||
parser.add_argument("--input_dir", "-i", default="/root/autodl-tmp/video",
|
||
help="输入视频目录 (默认: /root/autodl-tmp/video)")
|
||
parser.add_argument("--output_dir", "-o", default="/root/autodl-tmp/video_processed",
|
||
help="输出目录 (默认: /root/autodl-tmp/video_processed)")
|
||
parser.add_argument("--duration", "-d", type=int, default=30,
|
||
help="每个片段的时长(秒),默认30秒")
|
||
parser.add_argument("--audio_format", "-f", default="wav",
|
||
choices=["wav", "flac", "ogg"], help="音频格式 (默认: wav)")
|
||
parser.add_argument("--verbose", "-v", action="store_true",
|
||
help="显示详细日志")
|
||
parser.add_argument("--check_only", action="store_true",
|
||
help="仅检查输出目录结构,不进行处理")
|
||
|
||
args = parser.parse_args()
|
||
|
||
if args.verbose:
|
||
logging.getLogger().setLevel(logging.DEBUG)
|
||
|
||
# 创建视频处理器
|
||
processor = VideoProcessor(
|
||
input_dir=args.input_dir,
|
||
output_dir=args.output_dir,
|
||
segment_duration=args.duration,
|
||
audio_format=args.audio_format
|
||
)
|
||
|
||
if args.check_only:
|
||
# 仅检查输出结构
|
||
structure = processor.get_output_structure()
|
||
print("\n📁 输出目录结构:")
|
||
print(f"主输出目录: {structure['output_dir']}")
|
||
|
||
if structure['videos']:
|
||
print(f"\n📹 视频处理统计:")
|
||
for video in structure['videos']:
|
||
print(f" {video['name']}:")
|
||
print(f" 📹 video_split: {video['video_split_count']} 个片段")
|
||
print(f" 🎵 audio_split: {video['audio_split_count']} 个音频文件")
|
||
else:
|
||
print("📁 输出目录为空,没有处理过的视频")
|
||
else:
|
||
# 执行完整的处理流程
|
||
success_videos, total_segments, total_audio = processor.process_videos()
|
||
|
||
if success_videos > 0:
|
||
print(f"\n✅ 处理完成!")
|
||
print(f"成功处理: {success_videos} 个视频")
|
||
print(f"切割片段: {total_segments} 个")
|
||
print(f"音频文件: {total_audio} 个")
|
||
print(f"输出目录: {args.output_dir}")
|
||
else:
|
||
print(f"\n❌ 处理失败,请检查输入目录和依赖")
|
||
|
||
if __name__ == "__main__":
|
||
main() |