video_template_gen/code/video_processor.py

323 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
视频处理整合工具
先进行视频切割,再转换为音频
"""
import os
import sys
import argparse
import subprocess
from pathlib import Path
import logging
from typing import List, Tuple
# 导入自定义模块
from video_splitter import get_video_files, split_video, batch_split_videos
from video2audio import Video2AudioExtractor
# 设置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class VideoProcessor:
"""视频处理整合类"""
def __init__(self, input_dir: str, output_dir: str, segment_duration: int = 30, audio_format: str = "wav"):
"""
初始化视频处理器
Args:
input_dir: 输入视频目录
output_dir: 输出目录
segment_duration: 每个片段的时长(秒)
audio_format: 音频格式
"""
self.input_dir = Path(input_dir)
self.output_dir = Path(output_dir)
self.segment_duration = segment_duration
self.audio_format = audio_format
# 创建主输出目录
self.output_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"视频处理器初始化完成")
logger.info(f"输入目录: {self.input_dir}")
logger.info(f"输出目录: {self.output_dir}")
def check_dependencies(self) -> bool:
"""检查依赖是否可用"""
try:
# 检查ffmpeg
subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True)
logger.info("✅ ffmpeg 可用")
# 检查Python依赖
import decord
import numpy
import tqdm
import soundfile
logger.info("✅ Python依赖包可用")
return True
except (subprocess.CalledProcessError, FileNotFoundError):
logger.error("❌ ffmpeg 未安装或不可用")
logger.info("请安装 ffmpeg: sudo apt install ffmpeg")
return False
except ImportError as e:
logger.error(f"❌ Python依赖包缺失: {e}")
logger.info("请安装依赖: pip install -r requirements.txt")
return False
def process_videos(self) -> Tuple[int, int, int]:
"""
处理视频:先切割,再提取音频
Returns:
Tuple[int, int, int]: (成功处理的视频数, 切割的片段数, 提取的音频数)
"""
logger.info("=" * 60)
logger.info("开始视频处理流程")
logger.info("=" * 60)
# 检查依赖
if not self.check_dependencies():
return 0, 0, 0
# 获取视频文件列表
logger.info("步骤1: 扫描视频文件")
video_files = get_video_files(str(self.input_dir))
if not video_files:
logger.warning("没有找到视频文件")
return 0, 0, 0
logger.info(f"找到 {len(video_files)} 个视频文件")
# 统计信息
success_videos = 0
total_segments = 0
total_audio_files = 0
# 处理每个视频
for i, video_file in enumerate(video_files, 1):
logger.info(f"\n{'='*40}")
logger.info(f"处理第 {i}/{len(video_files)} 个视频: {video_file.name}")
logger.info(f"{'='*40}")
try:
# 步骤2: 切割视频
logger.info("步骤2: 切割视频")
if self._split_single_video(video_file):
success_videos += 1
# 步骤3: 提取音频
logger.info("步骤3: 提取音频")
segments_count, audio_count = self._extract_audio_from_segments(video_file)
total_segments += segments_count
total_audio_files += audio_count
else:
logger.error(f"视频 {video_file.name} 处理失败")
except Exception as e:
logger.error(f"处理视频 {video_file.name} 时出错: {e}")
# 输出统计信息
logger.info(f"\n{'='*60}")
logger.info("处理完成!")
logger.info(f"{'='*60}")
logger.info(f"成功处理的视频数: {success_videos}/{len(video_files)}")
logger.info(f"总切割片段数: {total_segments}")
logger.info(f"总音频文件数: {total_audio_files}")
logger.info(f"输出目录: {self.output_dir}")
return success_videos, total_segments, total_audio_files
def _split_single_video(self, video_file: Path) -> bool:
"""
切割单个视频
Args:
video_file: 视频文件路径
Returns:
bool: 是否成功
"""
try:
# 为每个视频创建独立的输出目录结构
video_name = video_file.stem
video_output_dir = self.output_dir / video_name
video_split_dir = video_output_dir / "video_split"
# 创建目录
video_split_dir.mkdir(parents=True, exist_ok=True)
# 调用split_video但传入video_split_dir作为输出目录
success = split_video(video_file, str(video_split_dir), self.segment_duration)
# 如果视频不足30秒split_video会将原视频复制到video_split_dir
# 如果视频超过30秒split_video会将片段保存到video_split_dir
# 两种情况都符合我们的期望
return success
except Exception as e:
logger.error(f"切割视频失败: {e}")
return False
def _extract_audio_from_segments(self, original_video: Path) -> Tuple[int, int]:
"""
从视频片段中提取音频
Args:
original_video: 原始视频文件路径
Returns:
Tuple[int, int]: (片段数, 音频文件数)
"""
video_name = original_video.stem
video_output_dir = self.output_dir / video_name
video_split_dir = video_output_dir / "video_split"
audio_split_dir = video_output_dir / "audio_split"
if not video_split_dir.exists():
logger.warning(f"视频片段目录不存在: {video_split_dir}")
return 0, 0
# 获取所有视频片段
video_extensions = ['.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv', '.webm']
segment_files = []
for ext in video_extensions:
segment_files.extend(video_split_dir.glob(f"*{ext}"))
if not segment_files:
logger.warning(f"没有找到视频片段: {video_split_dir}")
return 0, 0
logger.info(f"找到 {len(segment_files)} 个视频片段")
# 创建音频输出目录
audio_split_dir.mkdir(exist_ok=True)
# 提取音频
audio_count = 0
for segment_file in segment_files:
try:
logger.info(f"提取音频: {segment_file.name}")
# 创建音频提取器输出到audio_split目录
segment_audio_extractor = Video2AudioExtractor(str(audio_split_dir))
video_out, audio_out = segment_audio_extractor.extract_audio_from_video(
str(segment_file), self.audio_format
)
if audio_out:
audio_count += 1
logger.info(f"音频提取成功: {Path(audio_out).name}")
else:
logger.warning(f"音频提取失败: {segment_file.name}")
except Exception as e:
logger.error(f"提取音频时出错: {e}")
logger.info(f"音频提取完成: {audio_count}/{len(segment_files)} 个片段")
return len(segment_files), audio_count
def get_output_structure(self) -> dict:
"""
获取输出目录结构信息
Returns:
dict: 目录结构信息
"""
structure = {
"output_dir": str(self.output_dir),
"videos": []
}
# 统计每个视频的目录结构
if self.output_dir.exists():
for video_dir in self.output_dir.iterdir():
if video_dir.is_dir():
video_info = {
"name": video_dir.name,
"video_split_count": 0,
"audio_split_count": 0
}
# 检查video_split目录
video_split_dir = video_dir / "video_split"
if video_split_dir.exists():
segments = list(video_split_dir.glob("*.mp4"))
video_info["video_split_count"] = len(segments)
# 检查audio_split目录
audio_split_dir = video_dir / "audio_split"
if audio_split_dir.exists():
audio_files = list(audio_split_dir.glob(f"*.{self.audio_format}"))
video_info["audio_split_count"] = len(audio_files)
structure["videos"].append(video_info)
return structure
def main():
"""主函数"""
parser = argparse.ArgumentParser(description="视频处理整合工具 - 先切割再提取音频")
parser.add_argument("--input_dir", "-i", default="/root/autodl-tmp/video",
help="输入视频目录 (默认: /root/autodl-tmp/video)")
parser.add_argument("--output_dir", "-o", default="/root/autodl-tmp/video_processed",
help="输出目录 (默认: /root/autodl-tmp/video_processed)")
parser.add_argument("--duration", "-d", type=int, default=30,
help="每个片段的时长默认30秒")
parser.add_argument("--audio_format", "-f", default="wav",
choices=["wav", "flac", "ogg"], help="音频格式 (默认: wav)")
parser.add_argument("--verbose", "-v", action="store_true",
help="显示详细日志")
parser.add_argument("--check_only", action="store_true",
help="仅检查输出目录结构,不进行处理")
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
# 创建视频处理器
processor = VideoProcessor(
input_dir=args.input_dir,
output_dir=args.output_dir,
segment_duration=args.duration,
audio_format=args.audio_format
)
if args.check_only:
# 仅检查输出结构
structure = processor.get_output_structure()
print("\n📁 输出目录结构:")
print(f"主输出目录: {structure['output_dir']}")
if structure['videos']:
print(f"\n📹 视频处理统计:")
for video in structure['videos']:
print(f" {video['name']}:")
print(f" 📹 video_split: {video['video_split_count']} 个片段")
print(f" 🎵 audio_split: {video['audio_split_count']} 个音频文件")
else:
print("📁 输出目录为空,没有处理过的视频")
else:
# 执行完整的处理流程
success_videos, total_segments, total_audio = processor.process_videos()
if success_videos > 0:
print(f"\n✅ 处理完成!")
print(f"成功处理: {success_videos} 个视频")
print(f"切割片段: {total_segments}")
print(f"音频文件: {total_audio}")
print(f"输出目录: {args.output_dir}")
else:
print(f"\n❌ 处理失败,请检查输入目录和依赖")
if __name__ == "__main__":
main()