video_translation/batch/utils/video_processor.py
2025-08-20 11:39:34 +08:00

105 lines
3.7 KiB
Python

import os
from core.st_utils.imports_and_utils import *
from core.utils.onekeycleanup import cleanup
from core.utils import load_key
import shutil
from functools import partial
from rich.panel import Panel
from rich.console import Console
from core import *
console = Console()
INPUT_DIR = 'batch/input'
OUTPUT_DIR = 'output'
SAVE_DIR = 'batch/output'
ERROR_OUTPUT_DIR = 'batch/output/ERROR'
YTB_RESOLUTION_KEY = "ytb_resolution"
def process_video(file, dubbing=False, is_retry=False):
if not is_retry:
prepare_output_folder(OUTPUT_DIR)
text_steps = [
("🎥 Processing input file", partial(process_input_file, file)),
("🎙️ Transcribing with Whisper", partial(_2_asr.transcribe)),
("✂️ Splitting sentences", split_sentences),
("📝 Summarizing and translating", summarize_and_translate),
("⚡ Processing and aligning subtitles", process_and_align_subtitles),
("🎬 Merging subtitles to video", _7_sub_into_vid.merge_subtitles_to_video),
]
if dubbing:
dubbing_steps = [
("🔊 Generating audio tasks", gen_audio_tasks),
("🎵 Extracting reference audio", _9_refer_audio.extract_refer_audio_main),
("🗣️ Generating audio", _10_gen_audio.gen_audio),
("🔄 Merging full audio", _11_merge_audio.merge_full_audio),
("🎞️ Merging dubbing to video", _12_dub_to_vid.merge_video_audio),
]
text_steps.extend(dubbing_steps)
current_step = ""
for step_name, step_func in text_steps:
current_step = step_name
for attempt in range(3):
try:
console.print(Panel(
f"[bold green]{step_name}[/]",
subtitle=f"Attempt {attempt + 1}/3" if attempt > 0 else None,
border_style="blue"
))
result = step_func()
if result is not None:
globals().update(result)
break
except Exception as e:
if attempt == 2:
error_panel = Panel(
f"[bold red]Error in step '{current_step}':[/]\n{str(e)}",
border_style="red"
)
console.print(error_panel)
cleanup(ERROR_OUTPUT_DIR)
return False, current_step, str(e)
console.print(Panel(
f"[yellow]Attempt {attempt + 1} failed. Retrying...[/]",
border_style="yellow"
))
console.print(Panel("[bold green]All steps completed successfully! 🎉[/]", border_style="green"))
cleanup(SAVE_DIR)
return True, "", ""
def prepare_output_folder(output_folder):
if os.path.exists(output_folder):
shutil.rmtree(output_folder)
os.makedirs(output_folder)
def process_input_file(file):
if file.startswith('http'):
_1_ytdlp.download_video_ytdlp(file, resolution=load_key(YTB_RESOLUTION_KEY))
video_file = _1_ytdlp.find_video_files()
else:
input_file = os.path.join('batch', 'input', file)
output_file = os.path.join(OUTPUT_DIR, file)
shutil.copy(input_file, output_file)
video_file = output_file
return {'video_file': video_file}
def split_sentences():
_3_1_split_nlp.split_by_spacy()
_3_2_split_meaning.split_sentences_by_meaning()
def summarize_and_translate():
_4_1_summarize.get_summary()
_4_2_translate.translate_all()
def process_and_align_subtitles():
_5_split_sub.split_for_sub_main()
_6_gen_sub.align_timestamp_main()
def gen_audio_tasks():
_8_1_audio_task.gen_audio_task_main()
_8_2_dub_chunks.gen_dub_chunks()