hot_video_analyse/new_VideoSplitter.py

588 lines
20 KiB
Python
Raw Normal View History

import cv2
import numpy as np
import os
import subprocess
import shutil
from datetime import timedelta
import argparse
from sklearn.metrics.pairwise import cosine_similarity
from skimage.metrics import structural_similarity as ssim
from scipy import stats
from collections import deque
# 设置固定的输入输出路径
INPUT_VIDEO_PATH = "/root/autodl-tmp/kuaishou_demo"
OUTPUT_DIR = "/root/autodl-tmp/02_VideoSplitter/VideoSplitter_output"
# 支持的视频格式
VIDEO_EXTENSIONS = ['.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv']
# 增强参数设置
SAMPLE_RATE = 1
METHOD = "enhanced" # 新增enhanced方法
THRESHOLD = 0.5
VERBOSE = True
# 新增参数
WINDOW_SIZE = 30 # 滑动窗口大小
GRADIENT_THRESHOLD = 0.02 # 渐变检测阈值
EDGE_DENSITY_THRESHOLD = 0.3 # 边缘密度变化阈值
COLOR_HIST_THRESHOLD = 0.4 # 颜色直方图变化阈值
# FFMPEG可能的路径
FFMPEG_PATHS = [
'ffmpeg',
'/usr/bin/ffmpeg',
'/usr/local/bin/ffmpeg',
'C:\\ffmpeg\\bin\\ffmpeg.exe',
]
def find_ffmpeg():
"""查找系统中可用的ffmpeg路径"""
try:
if os.name == 'nt':
result = subprocess.run(['where', 'ffmpeg'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if result.returncode == 0:
return result.stdout.strip().split('\n')[0]
else:
result = subprocess.run(['which', 'ffmpeg'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if result.returncode == 0:
return result.stdout.strip()
except Exception:
pass
for path in FFMPEG_PATHS:
if shutil.which(path):
return path
return None
def extract_enhanced_features(frame):
"""
提取增强特征用于场景检测
Args:
frame: 输入帧
Returns:
features: 特征字典
"""
# 调整大小以加快处理
frame_resized = cv2.resize(frame, (320, 180))
# 1. 灰度图
gray = cv2.cvtColor(frame_resized, cv2.COLOR_BGR2GRAY)
# 2. 颜色直方图HSV
hsv = cv2.cvtColor(frame_resized, cv2.COLOR_BGR2HSV)
hist_h = cv2.calcHist([hsv], [0], None, [50], [0, 180])
hist_s = cv2.calcHist([hsv], [1], None, [50], [0, 256])
hist_v = cv2.calcHist([hsv], [2], None, [50], [0, 256])
# 3. 边缘检测
edges = cv2.Canny(gray, 50, 150)
edge_density = np.sum(edges > 0) / (edges.shape[0] * edges.shape[1])
# 4. 亮度均值和标准差
brightness_mean = np.mean(gray)
brightness_std = np.std(gray)
# 5. 纹理特征(局部二值模式的简化版本)
sobel_x = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=3)
sobel_y = cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=3)
texture_energy = np.mean(np.sqrt(sobel_x**2 + sobel_y**2))
return {
'gray': gray,
'hist_h': hist_h.flatten(),
'hist_s': hist_s.flatten(),
'hist_v': hist_v.flatten(),
'edge_density': edge_density,
'brightness_mean': brightness_mean,
'brightness_std': brightness_std,
'texture_energy': texture_energy
}
def enhanced_frame_similarity(features1, features2):
"""
增强的帧相似度计算
Args:
features1, features2: 特征字典
Returns:
similarity_scores: 各种相似度分数的字典
"""
scores = {}
# 1. SSIM相似度
scores['ssim'] = ssim(features1['gray'], features2['gray'])
# 2. 颜色直方图相似度
scores['hist_h'] = cv2.compareHist(features1['hist_h'], features2['hist_h'], cv2.HISTCMP_CORREL)
scores['hist_s'] = cv2.compareHist(features1['hist_s'], features2['hist_s'], cv2.HISTCMP_CORREL)
scores['hist_v'] = cv2.compareHist(features1['hist_v'], features2['hist_v'], cv2.HISTCMP_CORREL)
# 3. 边缘密度变化
edge_diff = abs(features1['edge_density'] - features2['edge_density'])
scores['edge_stability'] = 1.0 - min(edge_diff / 0.5, 1.0) # 归一化
# 4. 亮度稳定性
brightness_diff = abs(features1['brightness_mean'] - features2['brightness_mean']) / 255.0
scores['brightness_stability'] = 1.0 - brightness_diff
# 5. 纹理稳定性
texture_diff = abs(features1['texture_energy'] - features2['texture_energy'])
scores['texture_stability'] = 1.0 - min(texture_diff / 100.0, 1.0) # 归一化
return scores
def detect_transition_type(similarity_window, frame_indices):
"""
检测转场类型
Args:
similarity_window: 相似度时间序列窗口
frame_indices: 对应的帧索引
Returns:
transition_info: 转场信息字典
"""
if len(similarity_window) < 5:
return {'type': 'unknown', 'confidence': 0.0}
# 计算相似度变化趋势
x = np.arange(len(similarity_window))
slope, intercept, r_value, p_value, std_err = stats.linregress(x, similarity_window)
# 计算变化率
diff = np.diff(similarity_window)
max_drop = np.min(diff) if len(diff) > 0 else 0
total_change = similarity_window[-1] - similarity_window[0]
transition_info = {
'slope': slope,
'r_squared': r_value**2,
'max_drop': max_drop,
'total_change': total_change,
'std': np.std(similarity_window)
}
# 分类转场类型
if r_value**2 > 0.7 and slope < -0.02:
# 线性下降,可能是渐变
if abs(max_drop) < 0.1:
transition_info.update({'type': 'fade', 'confidence': 0.8})
else:
transition_info.update({'type': 'dissolve', 'confidence': 0.7})
elif abs(max_drop) > 0.3:
# 突然下降,硬切
transition_info.update({'type': 'cut', 'confidence': 0.9})
elif np.std(similarity_window) > 0.1 and total_change < -0.2:
# 不规则变化,可能是复杂转场
transition_info.update({'type': 'complex', 'confidence': 0.6})
else:
transition_info.update({'type': 'stable', 'confidence': 0.5})
return transition_info
def enhanced_scene_detection(frames_info, method='enhanced', threshold=0.5):
"""
增强的场景变化检测
Args:
frames_info: 帧信息列表
method: 检测方法
threshold: 基础阈值
Returns:
scenes: 场景信息列表包含转场类型
"""
if len(frames_info) < WINDOW_SIZE:
return []
print("正在提取增强特征...")
features_list = []
# 提取所有帧的特征
for i, (frame_num, timestamp, frame_path) in enumerate(frames_info):
frame = cv2.imread(frame_path)
features = extract_enhanced_features(frame)
features_list.append(features)
if i % 50 == 0:
print(f"特征提取进度: {i+1}/{len(frames_info)}")
print("正在进行增强场景检测...")
# 滑动窗口分析
scenes = []
scene_start = frames_info[0]
similarity_window = deque(maxlen=WINDOW_SIZE)
composite_scores = []
for i in range(1, len(frames_info)):
# 计算多维相似度
sim_scores = enhanced_frame_similarity(features_list[i-1], features_list[i])
# 计算复合相似度分数
composite_score = (
sim_scores['ssim'] * 0.3 +
(sim_scores['hist_h'] + sim_scores['hist_s'] + sim_scores['hist_v']) / 3 * 0.25 +
sim_scores['edge_stability'] * 0.15 +
sim_scores['brightness_stability'] * 0.15 +
sim_scores['texture_stability'] * 0.15
)
composite_scores.append(composite_score)
similarity_window.append(composite_score)
# 自适应阈值
if len(composite_scores) > 50:
recent_scores = composite_scores[-50:]
adaptive_threshold = np.mean(recent_scores) - 2 * np.std(recent_scores)
adaptive_threshold = max(adaptive_threshold, threshold * 0.5) # 设置下限
else:
adaptive_threshold = threshold
# 检测场景变化
if composite_score < adaptive_threshold and len(similarity_window) >= 10:
# 分析转场类型
transition_info = detect_transition_type(
list(similarity_window)[-10:],
list(range(i-9, i+1))
)
scene_end = frames_info[i-1]
scene_duration = scene_end[1] - scene_start[1]
# 根据转场类型调整最小时长要求
min_duration = 1.0 if transition_info['type'] == 'cut' else 2.0
if scene_duration >= min_duration:
scenes.append({
'start_frame': scene_start[0],
'end_frame': scene_end[0],
'start_time': scene_start[1],
'end_time': scene_end[1],
'duration': scene_duration,
'transition_type': transition_info['type'],
'transition_confidence': transition_info['confidence'],
'similarity_score': composite_score,
'adaptive_threshold': adaptive_threshold
})
if VERBOSE:
print(f"检测到{transition_info['type']}转场: 帧 {scene_end[0]}, "
f"时间 {timedelta(seconds=scene_end[1])}, "
f"相似度: {composite_score:.4f}, "
f"置信度: {transition_info['confidence']:.2f}")
scene_start = frames_info[i]
similarity_window.clear() # 清空窗口重新开始
# 添加最后一个场景
if len(frames_info) > 0:
scene_end = frames_info[-1]
scene_duration = scene_end[1] - scene_start[1]
if scene_duration >= 1.0:
scenes.append({
'start_frame': scene_start[0],
'end_frame': scene_end[0],
'start_time': scene_start[1],
'end_time': scene_end[1],
'duration': scene_duration,
'transition_type': 'end',
'transition_confidence': 1.0,
'similarity_score': 1.0,
'adaptive_threshold': threshold
})
# 统计转场类型
transition_types = {}
for scene in scenes:
t_type = scene['transition_type']
transition_types[t_type] = transition_types.get(t_type, 0) + 1
print(f"\n增强场景检测统计:")
print(f"检测到 {len(scenes)} 个场景, 平均时长: {sum(s['duration'] for s in scenes)/max(1, len(scenes)):.2f}")
print("转场类型分析:")
for t_type, count in transition_types.items():
print(f" {t_type}: {count}")
return scenes
def extract_frames(video_path, output_dir, sample_rate=1):
"""保持原有的帧提取功能"""
if not os.path.exists(output_dir):
os.makedirs(output_dir)
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = frame_count / fps
print(f"视频信息:{frame_count}帧, {fps}fps, 时长:{timedelta(seconds=duration)}")
frames_info = []
frame_number = 0
saved_count = 0
while True:
ret, frame = cap.read()
if not ret:
break
if frame_number % sample_rate == 0:
timestamp = frame_number / fps
frame_path = os.path.join(output_dir, f"frame_{saved_count:05d}.jpg")
cv2.imwrite(frame_path, frame)
frames_info.append((frame_number, timestamp, frame_path))
saved_count += 1
frame_number += 1
if frame_number % 100 == 0:
print(f"处理进度: {frame_number}/{frame_count} ({frame_number/frame_count*100:.2f}%)")
cap.release()
print(f"共提取了 {saved_count}")
return frames_info
def get_video_files(directory):
"""获取目录中所有视频文件"""
video_files = []
if os.path.isfile(directory):
ext = os.path.splitext(directory)[1].lower()
if ext in VIDEO_EXTENSIONS:
return [directory]
for root, _, files in os.walk(directory):
for file in files:
ext = os.path.splitext(file)[1].lower()
if ext in VIDEO_EXTENSIONS:
video_files.append(os.path.join(root, file))
return video_files
def get_parent_folder_name(path):
"""获取路径中'video'上一级文件夹的名字"""
abs_path = os.path.abspath(path)
if os.path.isdir(abs_path):
parent = os.path.dirname(abs_path.rstrip('/'))
folder_name = os.path.basename(parent)
else:
parent = os.path.dirname(os.path.dirname(abs_path))
folder_name = os.path.basename(parent)
return folder_name
def process_single_video(video_path, output_base_dir):
"""
处理单个视频文件
Args:
video_path: 视频文件路径
output_base_dir: 输出基础目录
Returns:
dict: 处理结果信息
"""
print(f"\n{'='*60}")
print(f"正在处理视频: {os.path.basename(video_path)}")
print(f"{'='*60}")
# 创建输出目录
video_name = os.path.splitext(os.path.basename(video_path))[0]
output_dir = os.path.join(output_base_dir, video_name)
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# 提取帧
frames_dir = os.path.join(output_dir, "frames")
print(f"正在提取帧到: {frames_dir}")
frames_info = extract_frames(video_path, frames_dir, SAMPLE_RATE)
if not frames_info:
print(f"警告: 无法从视频 {video_path} 提取帧")
return {
'video_path': video_path,
'status': 'failed',
'error': '无法提取帧',
'scenes': []
}
# 场景检测
print(f"正在进行场景检测...")
scenes = enhanced_scene_detection(frames_info, METHOD, THRESHOLD)
# 生成详细报告
report_path = os.path.join(output_dir, f"{video_name}_analysis_report.txt")
generate_detailed_report(video_path, frames_info, scenes, report_path)
# 生成JSON格式的切割信息
json_path = os.path.join(output_dir, f"{video_name}_scenes.json")
generate_scenes_json(video_path, scenes, json_path)
print(f"处理完成: {video_name}")
print(f"检测到 {len(scenes)} 个场景")
print(f"报告文件: {report_path}")
print(f"JSON文件: {json_path}")
return {
'video_path': video_path,
'status': 'success',
'scenes_count': len(scenes),
'scenes': scenes,
'report_path': report_path,
'json_path': json_path
}
def generate_detailed_report(video_path, frames_info, scenes, report_path):
"""生成详细的文本报告"""
video_name = os.path.basename(video_path)
with open(report_path, 'w', encoding='utf-8') as f:
f.write("增强视频转场分析报告\n")
f.write("=" * 60 + "\n\n")
f.write(f"视频文件: {video_name}\n")
f.write(f"视频路径: {video_path}\n")
f.write(f"分析时间: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"总帧数: {len(frames_info)}\n")
f.write(f"检测到场景数: {len(scenes)}\n\n")
# 转场类型统计
transition_stats = {}
for scene in scenes:
t_type = scene['transition_type']
transition_stats[t_type] = transition_stats.get(t_type, 0) + 1
f.write("转场类型统计:\n")
f.write("-" * 30 + "\n")
for t_type, count in transition_stats.items():
percentage = (count / len(scenes)) * 100 if scenes else 0
f.write(f"{t_type}: {count} 个 ({percentage:.1f}%)\n")
f.write("\n")
# 场景时长统计
durations = [scene['duration'] for scene in scenes]
if durations:
f.write("场景时长统计:\n")
f.write("-" * 30 + "\n")
f.write(f"平均时长: {np.mean(durations):.2f}\n")
f.write(f"最短时长: {np.min(durations):.2f}\n")
f.write(f"最长时长: {np.max(durations):.2f}\n")
f.write(f"时长标准差: {np.std(durations):.2f}\n\n")
# 详细场景信息
f.write("详细场景信息:\n")
f.write("-" * 60 + "\n")
for i, scene in enumerate(scenes, 1):
f.write(f"场景 {i}:\n")
f.write(f" 开始帧: {scene['start_frame']}\n")
f.write(f" 结束帧: {scene['end_frame']}\n")
f.write(f" 开始时间: {timedelta(seconds=scene['start_time'])}\n")
f.write(f" 结束时间: {timedelta(seconds=scene['end_time'])}\n")
f.write(f" 时长: {scene['duration']:.2f}\n")
f.write(f" 转场类型: {scene['transition_type']}\n")
f.write(f" 置信度: {scene['transition_confidence']:.3f}\n")
f.write(f" 相似度分数: {scene['similarity_score']:.4f}\n")
f.write(f" 自适应阈值: {scene['adaptive_threshold']:.4f}\n")
f.write("\n")
def generate_scenes_json(video_path, scenes, json_path):
"""生成JSON格式的切割信息"""
import json
video_name = os.path.basename(video_path)
scenes_data = {
'content': []
}
for i, scene in enumerate(scenes, 1):
scene_data = {
'type': scene['transition_type'],
'scene_index': i,
'start_time': scene['start_time'],
'end_time': scene['end_time'],
'duration': scene['duration'],
}
scenes_data['scenes'].append(scene_data)
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(scenes_data, f, indent=2, ensure_ascii=False)
def batch_process_videos(input_dir, output_dir):
"""
批量处理视频文件只生成每个视频的单独报告
Args:
input_dir: 输入目录路径
output_dir: 输出目录路径
"""
print("=" * 80)
print("增强视频转场分析工具 - 批量处理模式")
print("=" * 80)
# 查找ffmpeg
ffmpeg_path = find_ffmpeg()
if ffmpeg_path:
print(f"已找到ffmpeg: {ffmpeg_path}")
else:
print("警告: 未找到ffmpeg某些功能可能受限")
# 获取所有视频文件
video_files = get_video_files(input_dir)
if not video_files:
print(f"错误: 在 '{input_dir}' 中没有找到视频文件")
print(f"支持的格式: {', '.join(VIDEO_EXTENSIONS)}")
return
print(f"找到 {len(video_files)} 个视频文件:")
for i, video_file in enumerate(video_files, 1):
print(f" {i}. {os.path.basename(video_file)}")
# 创建输出目录
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# 批量处理
for i, video_path in enumerate(video_files, 1):
print(f"\n处理进度: {i}/{len(video_files)}")
try:
process_single_video(video_path, output_dir)
except Exception as e:
print(f"处理视频 {os.path.basename(video_path)} 时出错: {e}")
print(f"\n{'='*80}")
print("批量处理完成! 每个视频已生成单独报告。")
print(f"{'='*80}")
def main():
"""主函数"""
import datetime
# 直接使用默认参数进行批量处理
print("增强视频转场分析工具 - 批量处理模式")
print(f"输入目录: {INPUT_VIDEO_PATH}")
print(f"输出目录: {OUTPUT_DIR}")
print(f"采样率: {SAMPLE_RATE}")
print(f"检测阈值: {THRESHOLD}")
print(f"检测方法: {METHOD}")
# 检查输入路径
if not os.path.exists(INPUT_VIDEO_PATH):
print(f"错误: 输入路径不存在: {INPUT_VIDEO_PATH}")
return
# 执行批量处理
batch_process_videos(INPUT_VIDEO_PATH, OUTPUT_DIR)
if __name__ == "__main__":
import datetime
main()