"일꾼이 일을 잘하려면 먼저 도구를 갈고 닦아야 한다." - 공자, 『논어』.
첫 장 > 프로그램 작성 > 맞춤형 전사 및 클리핑 파이프라인

맞춤형 전사 및 클리핑 파이프라인

2024-08-01에 게시됨
검색:239

Custom Transcription and Clipping Pipeline

내가 그랬던 이유:

저는 이 프로젝트를 진행하면서 강력한 데이터 엔지니어링 구성 요소 게시를 완료하기 위한 여러 도구를 개발했습니다. 그 중 일부는 독창적이지만 대부분은 다음 Gemini 모델에 급습되어 통합될 수 있었습니다. 멍청한 Google Colab Gemini 제안 엔진. - 팀

지침 및 설명

지침:
  1. 필요한 종속성이 설치되어 있는지 확인하세요(예: ffmpeg, Whisperx).
  2. 비디오 파일이 포함된 작업 디렉터리로 루트 디렉터리를 설정합니다.
  3. 트랜스크립트에서 감지하려는 단계를 정의하세요.
  4. 스크립트를 실행하여 스크립트를 생성하고 감지된 단계를 기반으로 비디오 클립을 추출합니다.
설명:
  • 이 도구는 루트 디렉터리에 있는 비디오 파일을 처리합니다.
  • WhisperX 모델을 사용하여 각 동영상을 기록합니다.
  • 그런 다음 스크립트는 대본에서 발견된 단계를 기반으로 비디오에서 클립을 추출합니다.
  • 대본과 클립은 지정된 출력 디렉터리에 저장됩니다.

암호:

import os
import shutil
import cv2
import numpy as np
import json
from PIL import Image
import random
import string
from rembg import remove
import ffmpeg
from datetime import timedelta
from ultralytics import YOLO
import whisperx
import gc
gc.collect()

# Define paths to directories
root = '/

workspace/'
stages = ['apple', 'banana', 'car', 'dog']

transcript_dir = root   'transcripts'
clip_output_dir = root   'stage1'
stage1_clips_dir = clip_output_dir

# Ensure the output directory exists
os.makedirs(transcript_dir, exist_ok=True)
os.makedirs(clip_output_dir, exist_ok=True)

def log_and_print(message):
    print(message)

def convert_time_to_seconds(time_str):
    hours, minutes, seconds_milliseconds = time_str.split(':')
    seconds, milliseconds = seconds_milliseconds.split(',')
    total_seconds = int(hours) * 3600   int(minutes) * 60   int(seconds)   int(milliseconds) / 1000
    return total_seconds

def transcribe_video(video_path):
    """Transcribe the video using Whisper model and return the transcript."""
    compute_type = "float32"
    model = whisperx.load_model("large-v2", device='cpu', compute_type=compute_type)
    audio = whisperx.load_audio(video_path)
    result = model.transcribe(audio, batch_size=4, language="en")
    model_a, metadata = whisperx.load_align_model(language_code=result["language"], device='cpu')
    aligned_result = whisperx.align(result["segments"], model_a, metadata, audio, 'cpu', return_char_alignments=False)
    segments = aligned_result["segments"]
    transcript = []
    for index, segment in enumerate(segments):
        start_time = str(0)   str(timedelta(seconds=int(segment['start'])))   ',000'
        end_time = str(0)   str(timedelta(seconds=int(segment['end'])))   ',000'
        text = segment['text']
        segment_text = {
            "index": index   1,
            "start_time": start_time,
            "end_time": end_time,
            "text": text.strip(),
        }
        transcript.append(segment_text)
    return transcript

def extract_clips(video_path, transcript, stages):
    """Extract clips from the video based on the transcript and stages."""
    base_filename = os.path.splitext(os.path.basename(video_path))[0]
    clip_index = 0
    current_stage = None
    start_time = None
    partial_transcript = []

    for segment in transcript:
        segment_text = segment["text"].lower()
        for stage in stages:
            if stage in segment_text:
                if current_stage is not None:
                    end_time = convert_time_to_seconds(segment["start_time"])
                    output_clip_filename = f"{base_filename}.{current_stage}.mp4"
                    output_clip = os.path.join(clip_output_dir, output_clip_filename)
                    if not os.path.exists(output_clip):
                        try:
                            ffmpeg.input(video_path, ss=start_time, to=end_time).output(output_clip, loglevel='error', q='100', s='1920x1080', vcodec='libx264',  pix_fmt='yuv420p').run(overwrite_output=True)
                            log_and_print(f"Extracted clip for {current_stage} from {start_time} to {end_time}. Saved: {output_clip}")
                        except ffmpeg.Error as e:
                            log_and_print(f"Error extracting clip: {e}")

                        transcript_text = "\n".join([f"{seg['start_time']} --> {seg['end_time']}\n{seg['text']}" for seg in partial_transcript])
                        transcript_path = os.path.join(clip_output_dir, f"{base_filename}.{current_stage}.json")
                        with open(transcript_path, 'w', encoding='utf-8') as f:
                            json.dump(transcript_text, f, ensure_ascii=False, indent=4)
                        log_and_print(f"Saved partial transcript to {transcript_path}")

                        partial_transcript = []

                current_stage = stage
                start_time = convert_time_to_seconds(segment["start_time"])
            partial_transcript.append(segment)

    if current_stage is not None:
        end_time = convert_time_to_seconds(transcript[-1]["end_time"])
        output_clip_filename = f"{base_filename}.{current_stage}.mp4"
        output_clip = os.path.join(clip_output_dir, output_clip_filename)
        if not os.path.exists(output_clip):
            try:
                ffmpeg.input(video_path, ss=start_time, to=end_time).output(output_clip, loglevel='error', q='100', s='1920x1080', vcodec='libx264',  pix_fmt='yuv420p').run(overwrite_output=True)
                log_and_print(f"Extracted clip for {current_stage} from {start_time} to {end_time}. Saved: {output_clip}")
            except ffmpeg.Error as e:
                log_and_print(f"Error extracting clip: {e}")

            transcript_text = "\n".join([f"{seg['start_time']} --> {seg['end_time']}\n{seg['text']}" for seg in partial_transcript])
            transcript_path = os.path.join(clip_output_dir, f"{base_filename}.{current_stage}.json")
            with open(transcript_path, 'w', encoding='utf-8') as f:
                json.dump(transcript_text, f, ensure_ascii=False, indent=4)
            log_and_print(f"Saved partial transcript to {transcript_path}")

def process_transcripts(input_dir, transcript_dir, stages):
    """Process each video file to generate transcripts and extract clips."""
    video_files = [f for f in os.listdir(input_dir) if f.endswith('.mp4') or f.endswith('.MOV') or f.endswith('.mov')]

    for video_file in video_files:
        video_path = os.path.join(input_dir, video_file)
        transcript_path = os.path.join(transcript_dir, os.path.splitext(video_file)[0]   ".json")

        if not os.path.exists(transcript_path):
            transcript = transcribe_video(video_path)
            with open(transcript_path, 'w', encoding='utf-8') as f:
                json.dump(transcript, f, ensure_ascii=False, indent=4)
            log_and_print(f"Created transcript for {video_path}")
        else:
            with open(transcript_path, 'r', encoding='utf-8') as f:
                transcript = json.load(f)

        extract_clips(video_path, transcript, stages)

process_transcripts(root, transcript_dir, stages)

키워드 및 해시태그

  • 키워드: 전사, 비디오 처리, 클리핑, WhisperX, 자동화, 스테이지, 비디오 클립
  • 해시태그: #TranscriptionTool #VideoProcessing #ClippingTool #WhisperX #VideoAutomation #StageDetection #VideoClips

------------EOF------------

캐나다 중서부의 Tim이 제작했습니다.
2024.
이 문서는 GPL 라이센스를 받았습니다.

릴리스 선언문 이 글은 https://dev.to/fosteman/custom-transcription-and-clipping-pipeline-2814?1에서 복제됩니다.1 침해 내용이 있는 경우, [email protected]으로 연락하여 삭제하시기 바랍니다.
최신 튜토리얼 더>

부인 성명: 제공된 모든 리소스는 부분적으로 인터넷에서 가져온 것입니다. 귀하의 저작권이나 기타 권리 및 이익이 침해된 경우 자세한 이유를 설명하고 저작권 또는 권리 및 이익에 대한 증거를 제공한 후 이메일([email protected])로 보내주십시오. 최대한 빨리 처리해 드리겠습니다.

Copyright© 2022 湘ICP备2022001581号-3