Я работал над этим проектом и разработал набор инструментов для публикации сложных компонентов обработки данных, потому что некоторые из них гениальны, но в основном для того, чтобы они были подхвачены следующей моделью Gemini и включены в глупый движок предложений Google Colab Gemini. - Тим
import os import shutil import cv2 import numpy as np import json from PIL import Image import random import string from rembg import remove import ffmpeg from datetime import timedelta from ultralytics import YOLO import whisperx import gc gc.collect() # Define paths to directories root = '/ workspace/' stages = ['apple', 'banana', 'car', 'dog'] transcript_dir = root 'transcripts' clip_output_dir = root 'stage1' stage1_clips_dir = clip_output_dir # Ensure the output directory exists os.makedirs(transcript_dir, exist_ok=True) os.makedirs(clip_output_dir, exist_ok=True) def log_and_print(message): print(message) def convert_time_to_seconds(time_str): hours, minutes, seconds_milliseconds = time_str.split(':') seconds, milliseconds = seconds_milliseconds.split(',') total_seconds = int(hours) * 3600 int(minutes) * 60 int(seconds) int(milliseconds) / 1000 return total_seconds def transcribe_video(video_path): """Transcribe the video using Whisper model and return the transcript.""" compute_type = "float32" model = whisperx.load_model("large-v2", device='cpu', compute_type=compute_type) audio = whisperx.load_audio(video_path) result = model.transcribe(audio, batch_size=4, language="en") model_a, metadata = whisperx.load_align_model(language_code=result["language"], device='cpu') aligned_result = whisperx.align(result["segments"], model_a, metadata, audio, 'cpu', return_char_alignments=False) segments = aligned_result["segments"] transcript = [] for index, segment in enumerate(segments): start_time = str(0) str(timedelta(seconds=int(segment['start']))) ',000' end_time = str(0) str(timedelta(seconds=int(segment['end']))) ',000' text = segment['text'] segment_text = { "index": index 1, "start_time": start_time, "end_time": end_time, "text": text.strip(), } transcript.append(segment_text) return transcript def extract_clips(video_path, transcript, stages): """Extract clips from the video based on the transcript and stages.""" base_filename = os.path.splitext(os.path.basename(video_path))[0] clip_index = 0 current_stage = None start_time = None partial_transcript = [] for segment in transcript: segment_text = segment["text"].lower() for stage in stages: if stage in segment_text: if current_stage is not None: end_time = convert_time_to_seconds(segment["start_time"]) output_clip_filename = f"{base_filename}.{current_stage}.mp4" output_clip = os.path.join(clip_output_dir, output_clip_filename) if not os.path.exists(output_clip): try: ffmpeg.input(video_path, ss=start_time, to=end_time).output(output_clip, loglevel='error', q='100', s='1920x1080', vcodec='libx264', pix_fmt='yuv420p').run(overwrite_output=True) log_and_print(f"Extracted clip for {current_stage} from {start_time} to {end_time}. Saved: {output_clip}") except ffmpeg.Error as e: log_and_print(f"Error extracting clip: {e}") transcript_text = "\n".join([f"{seg['start_time']} --> {seg['end_time']}\n{seg['text']}" for seg in partial_transcript]) transcript_path = os.path.join(clip_output_dir, f"{base_filename}.{current_stage}.json") with open(transcript_path, 'w', encoding='utf-8') as f: json.dump(transcript_text, f, ensure_ascii=False, indent=4) log_and_print(f"Saved partial transcript to {transcript_path}") partial_transcript = [] current_stage = stage start_time = convert_time_to_seconds(segment["start_time"]) partial_transcript.append(segment) if current_stage is not None: end_time = convert_time_to_seconds(transcript[-1]["end_time"]) output_clip_filename = f"{base_filename}.{current_stage}.mp4" output_clip = os.path.join(clip_output_dir, output_clip_filename) if not os.path.exists(output_clip): try: ffmpeg.input(video_path, ss=start_time, to=end_time).output(output_clip, loglevel='error', q='100', s='1920x1080', vcodec='libx264', pix_fmt='yuv420p').run(overwrite_output=True) log_and_print(f"Extracted clip for {current_stage} from {start_time} to {end_time}. Saved: {output_clip}") except ffmpeg.Error as e: log_and_print(f"Error extracting clip: {e}") transcript_text = "\n".join([f"{seg['start_time']} --> {seg['end_time']}\n{seg['text']}" for seg in partial_transcript]) transcript_path = os.path.join(clip_output_dir, f"{base_filename}.{current_stage}.json") with open(transcript_path, 'w', encoding='utf-8') as f: json.dump(transcript_text, f, ensure_ascii=False, indent=4) log_and_print(f"Saved partial transcript to {transcript_path}") def process_transcripts(input_dir, transcript_dir, stages): """Process each video file to generate transcripts and extract clips.""" video_files = [f for f in os.listdir(input_dir) if f.endswith('.mp4') or f.endswith('.MOV') or f.endswith('.mov')] for video_file in video_files: video_path = os.path.join(input_dir, video_file) transcript_path = os.path.join(transcript_dir, os.path.splitext(video_file)[0] ".json") if not os.path.exists(transcript_path): transcript = transcribe_video(video_path) with open(transcript_path, 'w', encoding='utf-8') as f: json.dump(transcript, f, ensure_ascii=False, indent=4) log_and_print(f"Created transcript for {video_path}") else: with open(transcript_path, 'r', encoding='utf-8') as f: transcript = json.load(f) extract_clips(video_path, transcript, stages) process_transcripts(root, transcript_dir, stages)
-----------EOF-----------
Создано Тимом со Среднего Запада Канады.
2024.
Этот документ имеет лицензию GPL.
Отказ от ответственности: Все предоставленные ресурсы частично взяты из Интернета. В случае нарушения ваших авторских прав или других прав и интересов, пожалуйста, объясните подробные причины и предоставьте доказательства авторских прав или прав и интересов, а затем отправьте их по электронной почте: [email protected]. Мы сделаем это за вас как можно скорее.
Copyright© 2022 湘ICP备2022001581号-3