hot_video_analyse/code/VideoSplitter.py

525 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import cv2
import numpy as np
import os
import subprocess
import shutil
from datetime import timedelta
import argparse
from sklearn.metrics.pairwise import cosine_similarity
from skimage.metrics import structural_similarity as ssim
# 设置固定的输入输出路径
INPUT_VIDEO_PATH = "/root/autodl-tmp/hot_video_analyse/source/sample_demo_1.mp4" # 修改为视频文件夹路径
OUTPUT_DIR = "/root/autodl-tmp/hot_video_analyse/source/Splitter" # 请修改为您的实际输出目录路径
# 支持的视频格式
VIDEO_EXTENSIONS = ['.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv']
# 设置参数
SAMPLE_RATE = 1 # 帧采样率
METHOD = "ssim" # 比较方法,可选 "ssim" 或 "cosine"
## 有分镜0.2 无分镜0.7
THRESHOLD = 0.8 # 相似度阈值
# 启用详细日志输出
VERBOSE = True
# FFMPEG可能的路径
FFMPEG_PATHS = [
'ffmpeg', # 系统路径
'/usr/bin/ffmpeg', # 常见Linux路径
'/usr/local/bin/ffmpeg', # 常见macOS路径
'C:\\ffmpeg\\bin\\ffmpeg.exe', # 常见Windows路径
]
def find_ffmpeg():
"""查找系统中可用的ffmpeg路径"""
# 首先尝试使用which/where命令
try:
if os.name == 'nt': # Windows
result = subprocess.run(['where', 'ffmpeg'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if result.returncode == 0:
return result.stdout.strip().split('\n')[0]
else: # Linux/Mac
result = subprocess.run(['which', 'ffmpeg'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if result.returncode == 0:
return result.stdout.strip()
except Exception:
pass
# 然后检查预定义的路径
for path in FFMPEG_PATHS:
if shutil.which(path):
return path
return None
def extract_frames(video_path, output_dir, sample_rate=1):
"""
从视频中提取帧并保存到指定目录
Args:
video_path: 视频文件路径
output_dir: 输出帧的目录
sample_rate: 采样率每N帧提取一帧
Returns:
frames_info: 包含帧信息的列表 [(frame_number, timestamp, frame_path), ...]
"""
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# 打开视频
cap = cv2.VideoCapture(video_path)
# 获取视频属性
fps = cap.get(cv2.CAP_PROP_FPS)
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = frame_count / fps
print(f"视频信息:{frame_count}帧, {fps}fps, 时长:{timedelta(seconds=duration)}")
frames_info = []
frame_number = 0
saved_count = 0
while True:
ret, frame = cap.read()
if not ret:
break
if frame_number % sample_rate == 0:
# 计算时间戳(秒)
timestamp = frame_number / fps
# 保存帧
frame_path = os.path.join(output_dir, f"frame_{saved_count:05d}.jpg")
cv2.imwrite(frame_path, frame)
# 记录帧信息
frames_info.append((frame_number, timestamp, frame_path))
saved_count += 1
frame_number += 1
# 显示进度
if frame_number % 100 == 0:
print(f"处理进度: {frame_number}/{frame_count} ({frame_number/frame_count*100:.2f}%)")
cap.release()
print(f"共提取了 {saved_count}")
return frames_info
def compare_frames(frame1_path, frame2_path, method='ssim', threshold=0.85):
"""
比较两帧的相似度
Args:
frame1_path: 第一帧路径
frame2_path: 第二帧路径
method: 比较方法,'ssim''cosine'
threshold: 相似度阈值
Returns:
is_similar: 是否相似
similarity: 相似度值
"""
# 读取帧
frame1 = cv2.imread(frame1_path)
frame2 = cv2.imread(frame2_path)
# 调整大小以加快处理
frame1_resized = cv2.resize(frame1, (320, 180))
frame2_resized = cv2.resize(frame2, (320, 180))
# 转换为灰度图
gray1 = cv2.cvtColor(frame1_resized, cv2.COLOR_BGR2GRAY)
gray2 = cv2.cvtColor(frame2_resized, cv2.COLOR_BGR2GRAY)
if method == 'ssim':
# 使用结构相似性指数
similarity, _ = ssim(gray1, gray2, full=True)
else: # cosine
# 使用余弦相似度
flat1 = gray1.flatten().reshape(1, -1)
flat2 = gray2.flatten().reshape(1, -1)
similarity = cosine_similarity(flat1, flat2)[0][0]
is_similar = similarity >= threshold
return is_similar, similarity
def detect_scene_changes(frames_info, method='ssim', threshold=0.85):
"""
检测场景变化
Args:
frames_info: 帧信息列表
method: 比较方法
threshold: 相似度阈值
Returns:
scenes: 场景信息列表 [(start_frame, end_frame, start_time, end_time), ...]
"""
global SCENE_START_FRAMES # 声明使用全局变量
if len(frames_info) < 2:
return []
scenes = []
clips = []
scene_start = frames_info[0]
for i in range(1, len(frames_info)):
prev_frame_path = frames_info[i-1][2]
curr_frame_path = frames_info[i][2]
is_similar, similarity = compare_frames(
prev_frame_path, curr_frame_path, method, threshold
)
if not is_similar:
# 场景变化,记录上一个场景
scene_end = frames_info[i-1]
scene_duration = scene_end[1] - scene_start[1]
clips.append(scene_start[0])
# 只记录持续时间超过1秒的场景
if scene_duration >= 0.2:
scenes.append((
scene_start[0], # 开始帧号
scene_end[0], # 结束帧号
scene_start[1], # 开始时间
scene_end[1], # 结束时间
scene_duration # 持续时间
))
# 开始新场景
scene_start = frames_info[i]
if VERBOSE:
print(f"检测到场景变化点: 帧 {scene_end[0]}, 时间 {timedelta(seconds=scene_end[1])}, 相似度: {similarity:.4f}")
# 添加最后一个场景
scene_end = frames_info[-1]
scene_duration = scene_end[1] - scene_start[1]
if scene_duration >= 0.2:
scenes.append((
scene_start[0],
scene_end[0],
scene_start[1],
scene_end[1],
scene_duration
))
print(f"\n场景变化统计:")
print(f"检测到 {len(scenes)} 个场景, 平均时长: {sum(s[4] for s in scenes)/max(1, len(scenes)):.2f}")
return scenes , clips
def extract_video_clips(video_path, scenes, output_dir, ffmpeg_path=None):
"""
根据场景信息提取视频片段
Args:
video_path: 视频路径
scenes: 场景信息列表
output_dir: 输出目录
ffmpeg_path: ffmpeg可执行文件路径
"""
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# 检查ffmpeg是否可用
if ffmpeg_path is None:
ffmpeg_path = find_ffmpeg()
if ffmpeg_path is None:
print("错误: 找不到ffmpeg。请安装ffmpeg并确保它在系统路径中。")
print("您可以从 https://ffmpeg.org/download.html 下载ffmpeg")
print("或使用包管理器安装: apt-get install ffmpeg / brew install ffmpeg 等")
return []
print(f"\n开始切割视频: {video_path}")
print(f"输出目录: {output_dir}")
print(f"使用ffmpeg: {ffmpeg_path}")
print("-" * 60)
clips_info = []
for i, scene in enumerate(scenes):
start_time = scene[2]
end_time = scene[3]
duration = scene[4]
output_file = os.path.join(output_dir, f"clip_{i:03d}_{duration:.2f}s.mp4")
# 构建ffmpeg命令
cmd = [
ffmpeg_path,
'-i', video_path,
'-ss', f"{start_time:.2f}",
'-to', f"{end_time:.2f}",
'-c:v', 'libx264',
'-c:a', 'aac',
'-y', # 覆盖已存在的文件
output_file
]
try:
print(f"\n切割片段 {i+1}/{len(scenes)}:")
print(f" 开始时间: {timedelta(seconds=start_time)}")
print(f" 结束时间: {timedelta(seconds=end_time)}")
print(f" 时长: {duration:.2f}")
print(f" 输出文件: {os.path.basename(output_file)}")
if VERBOSE:
print(f" 执行命令: {' '.join(cmd)}")
# 使用subprocess执行命令可以获取输出和错误
result = subprocess.run(
cmd,
stdout=subprocess.PIPE if not VERBOSE else None,
stderr=subprocess.PIPE,
text=True
)
if result.returncode == 0:
# 获取输出文件信息
file_size = os.path.getsize(output_file) / (1024 * 1024) # MB
print(f" ✓ 切割成功: {os.path.basename(output_file)} ({file_size:.2f} MB)")
clips_info.append({
'index': i,
'file': output_file,
'start': start_time,
'end': end_time,
'duration': duration,
'size_mb': file_size
})
else:
print(f" ✗ 切割失败: {result.stderr}")
except Exception as e:
print(f" ✗ 切割失败: {str(e)}")
# 汇总信息
if clips_info:
total_size = sum(clip['size_mb'] for clip in clips_info)
total_duration = sum(clip['duration'] for clip in clips_info)
print("\n切割完成汇总:")
print(f" 成功切割片段数: {len(clips_info)}/{len(scenes)}")
print(f" 总时长: {timedelta(seconds=total_duration)}")
print(f" 总文件大小: {total_size:.2f} MB")
# 列出所有文件
print("\n输出文件列表:")
for clip in clips_info:
print(f" {os.path.basename(clip['file'])} - {clip['duration']:.2f}秒 ({clip['size_mb']:.2f} MB)")
else:
print("\n没有成功切割任何片段")
return clips_info
def get_video_files(directory):
"""
获取目录中所有视频文件
Args:
directory: 目录路径
Returns:
视频文件路径列表
"""
video_files = []
# 检查是否是单个文件
if os.path.isfile(directory):
ext = os.path.splitext(directory)[1].lower()
if ext in VIDEO_EXTENSIONS:
return [directory]
# 遍历目录
for root, _, files in os.walk(directory):
for file in files:
# 检查文件扩展名
ext = os.path.splitext(file)[1].lower()
if ext in VIDEO_EXTENSIONS:
video_files.append(os.path.join(root, file))
return video_files
def process_video(video_path, output_base_dir, sample_rate, method, threshold, ffmpeg_path):
"""
处理单个视频文件
Args:
video_path: 视频文件路径
output_base_dir: 基础输出目录
sample_rate: 帧采样率
method: 比较方法
threshold: 相似度阈值
ffmpeg_path: ffmpeg路径
Returns:
处理是否成功
"""
# 获取视频文件名(不含扩展名)
video_filename = os.path.splitext(os.path.basename(video_path))[0]
# 为当前视频创建输出目录
video_output_dir = os.path.join(output_base_dir, video_filename)
if not os.path.exists(video_output_dir):
os.makedirs(video_output_dir)
# 创建输出子目录
frames_dir = os.path.join(video_output_dir, 'frames')
clips_dir = os.path.join(video_output_dir, 'clips')
if not os.path.exists(frames_dir):
os.makedirs(frames_dir)
if not os.path.exists(clips_dir):
os.makedirs(clips_dir)
print("\n处理参数:")
print(f"输入视频: {os.path.abspath(video_path)}")
print(f"输出目录: {os.path.abspath(video_output_dir)}")
print(f"帧采样率: 每{sample_rate}")
print(f"比较方法: {method}")
print(f"相似度阈值: {threshold}")
print("-" * 60)
try:
# 步骤1: 提取帧
print("\n步骤1: 正在提取视频帧...")
frames_info = extract_frames(video_path, frames_dir, sample_rate)
# 步骤2: 检测场景变化
print("\n步骤2: 正在检测场景变化...")
scenes = detect_scene_changes(frames_info, method, threshold)
# 输出场景信息
print(f"\n检测到 {len(scenes)} 个场景:")
for i, scene in enumerate(scenes):
start_time = timedelta(seconds=scene[2])
end_time = timedelta(seconds=scene[3])
duration = scene[4]
print(f"场景 {i+1}: {start_time} - {end_time} (时长: {duration:.2f}s)")
# 如果没有ffmpeg提前报错
if not ffmpeg_path:
print("\n错误: 缺少ffmpeg无法继续视频切割步骤。")
print("请安装ffmpeg后重试。")
return False
# 步骤3: 提取视频片段
print("\n步骤3: 正在提取视频片段...")
clips_info = extract_video_clips(video_path, scenes, clips_dir, ffmpeg_path)
# 创建结果摘要文件
summary_file = os.path.join(video_output_dir, 'summary.txt')
try:
with open(summary_file, 'w', encoding='utf-8') as f:
f.write("智能视频切割结果摘要\n")
f.write("=" * 40 + "\n\n")
f.write(f"输入视频: {os.path.abspath(video_path)}\n")
f.write(f"处理时间: {os.path.getmtime(summary_file)}\n\n")
f.write(f"检测到场景数: {len(scenes)}\n")
f.write(f"生成片段数: {len(clips_info)}\n\n")
f.write("片段详情:\n")
for i, clip in enumerate(clips_info):
f.write(f"{i+1}. {os.path.basename(clip['file'])}\n")
f.write(f" 开始: {timedelta(seconds=clip['start'])}\n")
f.write(f" 结束: {timedelta(seconds=clip['end'])}\n")
f.write(f" 时长: {clip['duration']:.2f}\n")
f.write(f" 大小: {clip['size_mb']:.2f} MB\n\n")
print(f"\n已保存处理摘要到: {summary_file}")
except Exception as e:
print(f"保存摘要文件失败: {str(e)}")
print("\n处理完成!")
print(f"帧提取目录: {os.path.abspath(frames_dir)}")
print(f"视频片段目录: {os.path.abspath(clips_dir)}")
return True
except Exception as e:
print(f"\n处理视频 {video_path} 时发生错误: {str(e)}")
return False
def get_parent_folder_name(path):
"""
获取路径中 'video' 上一级文件夹的名字
"""
abs_path = os.path.abspath(path)
# 如果是文件夹,直接用
if os.path.isdir(abs_path):
parent = os.path.dirname(abs_path.rstrip('/'))
folder_name = os.path.basename(parent)
else:
# 如果是文件,取其父目录的父目录
parent = os.path.dirname(os.path.dirname(abs_path))
folder_name = os.path.basename(parent)
return folder_name
def main():
# 欢迎信息
print("=" * 60)
print("智能视频切割工具 - 批量处理版")
print("=" * 60)
# 查找ffmpeg
ffmpeg_path = find_ffmpeg()
if ffmpeg_path:
print(f"已找到ffmpeg: {ffmpeg_path}")
else:
print("警告: 未找到ffmpeg视频切割功能将不可用")
print("请安装ffmpeg并确保它在系统路径中")
# 获取输入目录中的所有视频文件
video_files = get_video_files(INPUT_VIDEO_PATH)
if not video_files:
print(f"错误: 在 '{INPUT_VIDEO_PATH}' 中没有找到视频文件")
print(f"支持的视频格式: {', '.join(VIDEO_EXTENSIONS)}")
return
# 自动获取video上一级文件夹名
parent_folder_name = get_parent_folder_name(INPUT_VIDEO_PATH)
output_dir = os.path.join(OUTPUT_DIR, parent_folder_name)
if not os.path.exists(output_dir):
os.makedirs(output_dir)
print(f"\n输出目录: {output_dir}")
# 处理每个视频文件
successful = 0
failed = 0
for i, video_path in enumerate(video_files):
print("\n" + "=" * 60)
print(f"正在处理视频 [{i+1}/{len(video_files)}]: {os.path.basename(video_path)}")
print("=" * 60)
success = process_video(
video_path=video_path,
output_base_dir=output_dir,
sample_rate=SAMPLE_RATE,
method=METHOD,
threshold=THRESHOLD,
ffmpeg_path=ffmpeg_path
)
if success:
successful += 1
else:
failed += 1
# 打印批量处理总结
print("\n" + "=" * 60)
print("批量处理完成!")
print("=" * 60)
print(f"总共处理: {len(video_files)} 个视频文件")
print(f"成功: {successful}")
print(f"失败: {failed}")
print(f"输出目录: {os.path.abspath(output_dir)}")
if __name__ == "__main__":
main()