hot_video_analyse/VideoSplitter_Enhanced.py

586 lines
20 KiB
Python
Raw Permalink Normal View History

import cv2
import numpy as np
import os
import subprocess
import shutil
from datetime import timedelta
import argparse
from sklearn.metrics.pairwise import cosine_similarity
from skimage.metrics import structural_similarity as ssim
from scipy import stats
from collections import deque
import matplotlib.pyplot as plt
# 设置固定的输入输出路径
INPUT_VIDEO_PATH = "/root/autodl-tmp/kuaishou_demo"
OUTPUT_DIR = "/root/autodl-tmp/02_VideoSplitter/VideoSplitter_output"
# 支持的视频格式
VIDEO_EXTENSIONS = ['.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv']
# 增强参数设置
SAMPLE_RATE = 1
METHOD = "enhanced" # 新增enhanced方法
THRESHOLD = 0.5
VERBOSE = True
# 新增参数
WINDOW_SIZE = 30 # 滑动窗口大小
GRADIENT_THRESHOLD = 0.02 # 渐变检测阈值
EDGE_DENSITY_THRESHOLD = 0.3 # 边缘密度变化阈值
COLOR_HIST_THRESHOLD = 0.4 # 颜色直方图变化阈值
# FFMPEG可能的路径
FFMPEG_PATHS = [
'ffmpeg',
'/usr/bin/ffmpeg',
'/usr/local/bin/ffmpeg',
'C:\\ffmpeg\\bin\\ffmpeg.exe',
]
def find_ffmpeg():
"""查找系统中可用的ffmpeg路径"""
try:
if os.name == 'nt':
result = subprocess.run(['where', 'ffmpeg'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if result.returncode == 0:
return result.stdout.strip().split('\n')[0]
else:
result = subprocess.run(['which', 'ffmpeg'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if result.returncode == 0:
return result.stdout.strip()
except Exception:
pass
for path in FFMPEG_PATHS:
if shutil.which(path):
return path
return None
def extract_enhanced_features(frame):
"""
提取增强特征用于场景检测
Args:
frame: 输入帧
Returns:
features: 特征字典
"""
# 调整大小以加快处理
frame_resized = cv2.resize(frame, (320, 180))
# 1. 灰度图
gray = cv2.cvtColor(frame_resized, cv2.COLOR_BGR2GRAY)
# 2. 颜色直方图HSV
hsv = cv2.cvtColor(frame_resized, cv2.COLOR_BGR2HSV)
hist_h = cv2.calcHist([hsv], [0], None, [50], [0, 180])
hist_s = cv2.calcHist([hsv], [1], None, [50], [0, 256])
hist_v = cv2.calcHist([hsv], [2], None, [50], [0, 256])
# 3. 边缘检测
edges = cv2.Canny(gray, 50, 150)
edge_density = np.sum(edges > 0) / (edges.shape[0] * edges.shape[1])
# 4. 亮度均值和标准差
brightness_mean = np.mean(gray)
brightness_std = np.std(gray)
# 5. 纹理特征(局部二值模式的简化版本)
sobel_x = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=3)
sobel_y = cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=3)
texture_energy = np.mean(np.sqrt(sobel_x**2 + sobel_y**2))
return {
'gray': gray,
'hist_h': hist_h.flatten(),
'hist_s': hist_s.flatten(),
'hist_v': hist_v.flatten(),
'edge_density': edge_density,
'brightness_mean': brightness_mean,
'brightness_std': brightness_std,
'texture_energy': texture_energy
}
def enhanced_frame_similarity(features1, features2):
"""
增强的帧相似度计算
Args:
features1, features2: 特征字典
Returns:
similarity_scores: 各种相似度分数的字典
"""
scores = {}
# 1. SSIM相似度
scores['ssim'] = ssim(features1['gray'], features2['gray'])
# 2. 颜色直方图相似度
scores['hist_h'] = cv2.compareHist(features1['hist_h'], features2['hist_h'], cv2.HISTCMP_CORREL)
scores['hist_s'] = cv2.compareHist(features1['hist_s'], features2['hist_s'], cv2.HISTCMP_CORREL)
scores['hist_v'] = cv2.compareHist(features1['hist_v'], features2['hist_v'], cv2.HISTCMP_CORREL)
# 3. 边缘密度变化
edge_diff = abs(features1['edge_density'] - features2['edge_density'])
scores['edge_stability'] = 1.0 - min(edge_diff / 0.5, 1.0) # 归一化
# 4. 亮度稳定性
brightness_diff = abs(features1['brightness_mean'] - features2['brightness_mean']) / 255.0
scores['brightness_stability'] = 1.0 - brightness_diff
# 5. 纹理稳定性
texture_diff = abs(features1['texture_energy'] - features2['texture_energy'])
scores['texture_stability'] = 1.0 - min(texture_diff / 100.0, 1.0) # 归一化
return scores
def detect_transition_type(similarity_window, frame_indices):
"""
检测转场类型
Args:
similarity_window: 相似度时间序列窗口
frame_indices: 对应的帧索引
Returns:
transition_info: 转场信息字典
"""
if len(similarity_window) < 5:
return {'type': 'unknown', 'confidence': 0.0}
# 计算相似度变化趋势
x = np.arange(len(similarity_window))
slope, intercept, r_value, p_value, std_err = stats.linregress(x, similarity_window)
# 计算变化率
diff = np.diff(similarity_window)
max_drop = np.min(diff) if len(diff) > 0 else 0
total_change = similarity_window[-1] - similarity_window[0]
transition_info = {
'slope': slope,
'r_squared': r_value**2,
'max_drop': max_drop,
'total_change': total_change,
'std': np.std(similarity_window)
}
# 分类转场类型
if r_value**2 > 0.7 and slope < -0.02:
# 线性下降,可能是渐变
if abs(max_drop) < 0.1:
transition_info.update({'type': 'fade', 'confidence': 0.8})
else:
transition_info.update({'type': 'dissolve', 'confidence': 0.7})
elif abs(max_drop) > 0.3:
# 突然下降,硬切
transition_info.update({'type': 'cut', 'confidence': 0.9})
elif np.std(similarity_window) > 0.1 and total_change < -0.2:
# 不规则变化,可能是复杂转场
transition_info.update({'type': 'complex', 'confidence': 0.6})
else:
transition_info.update({'type': 'stable', 'confidence': 0.5})
return transition_info
def enhanced_scene_detection(frames_info, method='enhanced', threshold=0.5):
"""
增强的场景变化检测
Args:
frames_info: 帧信息列表
method: 检测方法
threshold: 基础阈值
Returns:
scenes: 场景信息列表包含转场类型
"""
if len(frames_info) < WINDOW_SIZE:
return []
print("正在提取增强特征...")
features_list = []
# 提取所有帧的特征
for i, (frame_num, timestamp, frame_path) in enumerate(frames_info):
frame = cv2.imread(frame_path)
features = extract_enhanced_features(frame)
features_list.append(features)
if i % 50 == 0:
print(f"特征提取进度: {i+1}/{len(frames_info)}")
print("正在进行增强场景检测...")
# 滑动窗口分析
scenes = []
scene_start = frames_info[0]
similarity_window = deque(maxlen=WINDOW_SIZE)
composite_scores = []
for i in range(1, len(frames_info)):
# 计算多维相似度
sim_scores = enhanced_frame_similarity(features_list[i-1], features_list[i])
# 计算复合相似度分数
composite_score = (
sim_scores['ssim'] * 0.3 +
(sim_scores['hist_h'] + sim_scores['hist_s'] + sim_scores['hist_v']) / 3 * 0.25 +
sim_scores['edge_stability'] * 0.15 +
sim_scores['brightness_stability'] * 0.15 +
sim_scores['texture_stability'] * 0.15
)
composite_scores.append(composite_score)
similarity_window.append(composite_score)
# 自适应阈值
if len(composite_scores) > 50:
recent_scores = composite_scores[-50:]
adaptive_threshold = np.mean(recent_scores) - 2 * np.std(recent_scores)
adaptive_threshold = max(adaptive_threshold, threshold * 0.5) # 设置下限
else:
adaptive_threshold = threshold
# 检测场景变化
if composite_score < adaptive_threshold and len(similarity_window) >= 10:
# 分析转场类型
transition_info = detect_transition_type(
list(similarity_window)[-10:],
list(range(i-9, i+1))
)
scene_end = frames_info[i-1]
scene_duration = scene_end[1] - scene_start[1]
# 根据转场类型调整最小时长要求
min_duration = 1.0 if transition_info['type'] == 'cut' else 2.0
if scene_duration >= min_duration:
scenes.append({
'start_frame': scene_start[0],
'end_frame': scene_end[0],
'start_time': scene_start[1],
'end_time': scene_end[1],
'duration': scene_duration,
'transition_type': transition_info['type'],
'transition_confidence': transition_info['confidence'],
'similarity_score': composite_score,
'adaptive_threshold': adaptive_threshold
})
if VERBOSE:
print(f"检测到{transition_info['type']}转场: 帧 {scene_end[0]}, "
f"时间 {timedelta(seconds=scene_end[1])}, "
f"相似度: {composite_score:.4f}, "
f"置信度: {transition_info['confidence']:.2f}")
scene_start = frames_info[i]
similarity_window.clear() # 清空窗口重新开始
# 添加最后一个场景
if len(frames_info) > 0:
scene_end = frames_info[-1]
scene_duration = scene_end[1] - scene_start[1]
if scene_duration >= 1.0:
scenes.append({
'start_frame': scene_start[0],
'end_frame': scene_end[0],
'start_time': scene_start[1],
'end_time': scene_end[1],
'duration': scene_duration,
'transition_type': 'end',
'transition_confidence': 1.0,
'similarity_score': 1.0,
'adaptive_threshold': threshold
})
# 统计转场类型
transition_types = {}
for scene in scenes:
t_type = scene['transition_type']
transition_types[t_type] = transition_types.get(t_type, 0) + 1
print(f"\n增强场景检测统计:")
print(f"检测到 {len(scenes)} 个场景, 平均时长: {sum(s['duration'] for s in scenes)/max(1, len(scenes)):.2f}")
print("转场类型分析:")
for t_type, count in transition_types.items():
print(f" {t_type}: {count}")
return scenes
def extract_frames(video_path, output_dir, sample_rate=1):
"""保持原有的帧提取功能"""
if not os.path.exists(output_dir):
os.makedirs(output_dir)
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = frame_count / fps
print(f"视频信息:{frame_count}帧, {fps}fps, 时长:{timedelta(seconds=duration)}")
frames_info = []
frame_number = 0
saved_count = 0
while True:
ret, frame = cap.read()
if not ret:
break
if frame_number % sample_rate == 0:
timestamp = frame_number / fps
frame_path = os.path.join(output_dir, f"frame_{saved_count:05d}.jpg")
cv2.imwrite(frame_path, frame)
frames_info.append((frame_number, timestamp, frame_path))
saved_count += 1
frame_number += 1
if frame_number % 100 == 0:
print(f"处理进度: {frame_number}/{frame_count} ({frame_number/frame_count*100:.2f}%)")
cap.release()
print(f"共提取了 {saved_count}")
# 转场类型统计
transition_stats = {}
duration_by_type = {}
for clip in frames_info:
t_type = clip['transition_type']
transition_stats[t_type] = transition_stats.get(t_type, 0) + 1
if t_type not in duration_by_type:
duration_by_type[t_type] = []
duration_by_type[t_type].append(clip['duration'])
# 生成报告文件
report_file = os.path.join(output_dir, 'enhanced_analysis_report.txt')
with open(report_file, 'w', encoding='utf-8') as f:
f.write("增强视频切割分析报告\n")
f.write("=" * 50 + "\n\n")
f.write(f"\n详细片段信息:\n")
for clip in frames_info:
f.write(f"\"textIdx\":{clip['index']+1},\n")
f.write(f"\"time_start\":{clip["start"]},\n")
f.write(f"\"time_end\":{clip["end"]},\n")
f.write(f" 时长: {clip['duration']:.2f}\n")
print(f"已生成增强分析报告: {report_file}")
return frames_info
def extract_video_clips_enhanced(video_path, scenes, output_dir, ffmpeg_path=None):
"""
增强的视频片段提取包含转场信息
"""
if not os.path.exists(output_dir):
os.makedirs(output_dir)
if ffmpeg_path is None:
ffmpeg_path = find_ffmpeg()
if ffmpeg_path is None:
print("错误: 找不到ffmpeg。")
return []
print(f"\n开始切割视频: {video_path}")
print(f"输出目录: {output_dir}")
print("-" * 60)
clips_info = []
for i, scene in enumerate(scenes):
start_time = scene['start_time']
end_time = scene['end_time']
duration = scene['duration']
transition_type = scene['transition_type']
try:
print(f"\n切割片段 {i+1}/{len(scenes)} ({transition_type}):")
print(f" 开始时间: {timedelta(seconds=start_time)}")
print(f" 结束时间: {timedelta(seconds=end_time)}")
print(f" 时长: {duration:.2f}")
print(f" 转场类型: {transition_type} (置信度: {scene['transition_confidence']:.2f})")
clips_info.append({
'index': i,
'file': output_file,
'start': start_time,
'end': end_time,
'duration': duration,
'transition_type': transition_type,
'confidence': scene['transition_confidence']
})
except Exception as e:
print(f" ✗ 切割失败: {str(e)}")
return clips_info
def generate_analysis_report(clips_info, output_dir):
"""
生成分析报告和可视化
"""
if not clips_info:
return
# 转场类型统计
transition_stats = {}
duration_by_type = {}
for clip in clips_info:
t_type = clip['transition_type']
transition_stats[t_type] = transition_stats.get(t_type, 0) + 1
if t_type not in duration_by_type:
duration_by_type[t_type] = []
duration_by_type[t_type].append(clip['duration'])
# 生成报告文件
report_file = os.path.join(output_dir, 'enhanced_analysis_report.txt')
with open(report_file, 'w', encoding='utf-8') as f:
f.write("增强视频切割分析报告\n")
f.write("=" * 50 + "\n\n")
f.write(f"\n详细片段信息:\n")
for clip in clips_info:
f.write(f"\"textIdx\":{clip['index']+1},\n")
f.write(f"\"time_start\":{clip["start"]},\n")
f.write(f"\"time_end\":{clip["end"]},\n")
f.write(f" 时长: {clip['duration']:.2f}\n")
print(f"已生成增强分析报告: {report_file}")
def process_video_enhanced(video_path, output_base_dir, sample_rate, method, threshold, ffmpeg_path):
"""
增强的视频处理函数
"""
video_filename = os.path.splitext(os.path.basename(video_path))[0]
video_output_dir = os.path.join(output_base_dir, video_filename)
if not os.path.exists(video_output_dir):
os.makedirs(video_output_dir)
frames_dir = os.path.join(video_output_dir, 'frames')
clips_dir = os.path.join(video_output_dir, 'clips')
for dir_path in [frames_dir, clips_dir]:
if not os.path.exists(dir_path):
os.makedirs(dir_path)
print("\n增强处理参数:")
print(f"输入视频: {os.path.abspath(video_path)}")
print(f"输出目录: {os.path.abspath(video_output_dir)}")
print(f"检测方法: {method} (增强版)")
print(f"滑动窗口大小: {WINDOW_SIZE}")
print("-" * 60)
try:
# 步骤1: 提取帧
print("\n步骤1: 正在提取视频帧...")
frames_info = extract_frames(video_path, frames_dir, sample_rate)
# 步骤2: 增强场景检测
print("\n步骤2: 正在进行增强场景检测...")
scenes = enhanced_scene_detection(frames_info, method, threshold)
if not scenes:
print("未检测到场景变化")
return False
print("\n增强处理完成!")
return True
def get_video_files(directory):
"""获取目录中所有视频文件"""
video_files = []
if os.path.isfile(directory):
ext = os.path.splitext(directory)[1].lower()
if ext in VIDEO_EXTENSIONS:
return [directory]
for root, _, files in os.walk(directory):
for file in files:
ext = os.path.splitext(file)[1].lower()
if ext in VIDEO_EXTENSIONS:
video_files.append(os.path.join(root, file))
return video_files
def get_parent_folder_name(path):
"""获取路径中'video'上一级文件夹的名字"""
abs_path = os.path.abspath(path)
if os.path.isdir(abs_path):
parent = os.path.dirname(abs_path.rstrip('/'))
folder_name = os.path.basename(parent)
else:
parent = os.path.dirname(os.path.dirname(abs_path))
folder_name = os.path.basename(parent)
return folder_name
def main():
"""主函数 - 增强版"""
print("=" * 60)
print("智能视频切割工具 - 增强版 (支持复杂转场检测)")
print("=" * 60)
ffmpeg_path = find_ffmpeg()
if ffmpeg_path:
print(f"已找到ffmpeg: {ffmpeg_path}")
else:
print("警告: 未找到ffmpeg视频切割功能将不可用")
video_files = get_video_files(INPUT_VIDEO_PATH)
if not video_files:
print(f"错误: 在 '{INPUT_VIDEO_PATH}' 中没有找到视频文件")
return
parent_folder_name = get_parent_folder_name(INPUT_VIDEO_PATH)
output_dir = os.path.join(OUTPUT_DIR, f"{parent_folder_name}_enhanced")
if not os.path.exists(output_dir):
os.makedirs(output_dir)
print(f"\n增强版输出目录: {output_dir}")
successful = 0
failed = 0
for i, video_path in enumerate(video_files):
print("\n" + "=" * 60)
print(f"正在处理视频 [{i+1}/{len(video_files)}]: {os.path.basename(video_path)}")
print("=" * 60)
success = process_video_enhanced(
video_path=video_path,
output_base_dir=output_dir,
sample_rate=SAMPLE_RATE,
method=METHOD,
threshold=THRESHOLD,
ffmpeg_path=ffmpeg_path
)
if success:
successful += 1
else:
failed += 1
print("\n" + "=" * 60)
print("增强版批量处理完成!")
print("=" * 60)
print(f"总共处理: {len(video_files)} 个视频文件")
print(f"成功: {successful}")
print(f"失败: {failed}")
print(f"输出目录: {os.path.abspath(output_dir)}")
if __name__ == "__main__":
main()