video_template_gen/code/video_splitter.py

193 lines
6.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
视频批量切割工具
使用decord将文件夹中的所有视频切割成30秒片段
"""
import os
import sys
import argparse
import subprocess
from pathlib import Path
import decord
from decord import VideoReader
import numpy as np
from tqdm import tqdm
import logging
# 设置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# 支持的视频格式
SUPPORTED_FORMATS = {'.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv', '.webm', '.m4v'}
def get_video_files(input_dir):
"""获取输入目录中的所有视频文件"""
video_files = []
input_path = Path(input_dir)
if not input_path.exists():
logger.error(f"输入目录不存在: {input_dir}")
return []
for file_path in input_path.rglob('*'):
if file_path.is_file() and file_path.suffix.lower() in SUPPORTED_FORMATS:
video_files.append(file_path)
logger.info(f"找到 {len(video_files)} 个视频文件")
return video_files
def split_video(video_path, output_dir, segment_duration=30):
"""
切割单个视频文件
Args:
video_path: 视频文件路径
output_dir: 输出目录
segment_duration: 每个片段的时长(秒)
"""
try:
# 直接使用指定的输出目录,不创建子目录
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
# 读取视频
vr = VideoReader(str(video_path))
fps = vr.get_avg_fps()
total_frames = len(vr)
total_duration = total_frames / fps
logger.info(f"处理视频: {video_path.name}")
logger.info(f" 总时长: {total_duration:.2f}秒, 帧率: {fps:.2f}fps, 总帧数: {total_frames}")
# 如果视频长度不足30秒直接复制
if total_duration <= segment_duration:
logger.info(f" 视频长度不足{segment_duration}秒,保持原样")
import shutil
output_file = output_path / video_path.name
shutil.copy2(video_path, output_file)
return True
# 计算需要切割的片段数
frames_per_segment = int(segment_duration * fps)
num_segments = int(np.ceil(total_frames / frames_per_segment))
logger.info(f" 将切割成 {num_segments} 个片段")
# 获取视频编码信息
import cv2
cap = cv2.VideoCapture(str(video_path))
fourcc = int(cap.get(cv2.CAP_PROP_FOURCC))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
cap.release()
# 切割视频
for i in tqdm(range(num_segments), desc=f"切割 {video_path.name}"):
start_frame = i * frames_per_segment
end_frame = min((i + 1) * frames_per_segment, total_frames)
# 生成输出文件名
base_name = video_path.stem
output_filename = f"{base_name}_segment_{i+1:03d}.mp4"
output_file = output_path / output_filename
# 使用ffmpeg进行切割更高效
start_time = start_frame / fps
duration = (end_frame - start_frame) / fps
cmd = [
'ffmpeg', '-y', # 覆盖输出文件
'-i', str(video_path), # 输入文件
'-ss', str(start_time), # 开始时间
'-t', str(duration), # 持续时间
'-c', 'copy', # 复制编码(不重新编码,更快)
'-avoid_negative_ts', 'make_zero', # 避免负时间戳
str(output_file) # 输出文件
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
logger.debug(f" 片段 {i+1} 保存为: {output_filename}")
except subprocess.CalledProcessError as e:
logger.error(f" 切割片段 {i+1} 失败: {e}")
logger.error(f" ffmpeg 错误输出: {e.stderr}")
return False
logger.info(f" 视频 {video_path.name} 切割完成")
return True
except Exception as e:
logger.error(f"处理视频 {video_path} 时出错: {e}")
return False
def batch_split_videos(input_dir, output_dir, segment_duration=30):
"""
批量切割视频
Args:
input_dir: 输入目录
output_dir: 输出目录
segment_duration: 每个片段的时长(秒)
"""
# 获取所有视频文件
video_files = get_video_files(input_dir)
if not video_files:
logger.warning("没有找到视频文件")
return
# 创建输出目录
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
# 统计信息
success_count = 0
failed_count = 0
# 处理每个视频文件
for video_file in video_files:
logger.info(f"\n开始处理: {video_file}")
if split_video(video_file, output_dir, segment_duration):
success_count += 1
else:
failed_count += 1
# 输出统计信息
logger.info(f"\n处理完成!")
logger.info(f"成功处理: {success_count} 个视频")
logger.info(f"处理失败: {failed_count} 个视频")
logger.info(f"输出目录: {output_dir}")
def main():
parser = argparse.ArgumentParser(description='视频批量切割工具')
parser.add_argument('--input_dir', '-i', default="/root/autodl-tmp/video", help='输入视频目录')
parser.add_argument('--output_dir', '-o', default="/root/autodl-tmp/video_split", help='输出目录')
parser.add_argument('--duration', '-d', type=int, default=30,
help='每个片段的时长默认30秒')
parser.add_argument('--verbose', '-v', action='store_true',
help='显示详细日志')
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
# 检查ffmpeg是否可用
try:
subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True)
except (subprocess.CalledProcessError, FileNotFoundError):
logger.error("ffmpeg 未安装或不可用,请先安装 ffmpeg")
logger.info("Ubuntu/Debian: sudo apt install ffmpeg")
logger.info("CentOS/RHEL: sudo yum install ffmpeg")
sys.exit(1)
# 开始批量处理
batch_split_videos(args.input_dir, args.output_dir, args.duration)
if __name__ == "__main__":
main()