OiO.lk Community platform!

Oio.lk is an excellent forum for developers, providing a wide range of resources, discussions, and support for those in the developer community. Join oio.lk today to connect with like-minded professionals, share insights, and stay updated on the latest trends and technologies in the development field.
  You need to log in or register to access the solved answers to this problem.
  • You have reached the maximum number of guest views allowed
  • Please register below to remove this limitation

Google Cloud Function using moviepy slows down and times out

  • Thread starter Thread starter Casey Traina
  • Start date Start date
C

Casey Traina

Guest
I have written a program that takes in a youtubeURL, parses the transcript into clips, downloads the original video using moviepy, trims it into the clips, then uploads them to Google Storage. I can confirm that the function works locally. However, when I deploy and call the function (using @https_fn.on_request() and @tasks_fn.on_task_dispatched, it works correctly but at a dramatically slower rate, eventually timing out despite having the timeout set to 60 minutes.

Has anyone been able to handle rather large video files using moviepy in google cloud functions?

Code:
BACKUP_COUNT = 1 # Define this as per your needs
HOURLY_BATCH_SIZE = 5 # Define this as per your needs
BACKUP_START_DATE = datetime(2023, 1, 1) # Define this as per your needs

@https_fn.on_request()
def queue_video_task(req: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""

    request_data = req.get_json()
    youtubeURL = request_data.get("url")
    authorID = request_data.get("authorID")

    logger.info(f"Received request for {youtubeURL}")

    id = uuid.uuid4()

    if not youtubeURL:
        return https_fn.Response('Missing URL', status=400)

    task_queue = functions.task_queue("ingestVideo") #enqueueing ingestVideo()
    target_uri = get_function_url("ingestVideo")

    schedule_time = datetime.now()

    dispatch_deadline_seconds = 60 * 30  # 60 minutes

    doc_ref = firestore.client().collection('processing').document(f"{id}")
    
    doc = doc_ref.get()
    
    if doc.exists:
        if doc._data['processing'] == False: # update document in case another function is dispatched
            return https_fn.Response('Video already processed', status=202)

    data = {
        "url" : youtubeURL,
        "authorID" : authorID,
        "processing" : True
    }
    doc_ref.set(data)

    # backup_date = BACKUP_START_DATE + timedelta(days=i)
    body = {
        "data": {
            "url": youtubeURL,
            "authorID" : authorID,
            "id": f"{id}"
            }
        }

    task_options = functions.TaskOptions(
        schedule_time=schedule_time,
        dispatch_deadline_seconds=dispatch_deadline_seconds,
        uri=target_uri
    )

    logger.info("Updated document")
    logger.info(f"Sent with body: {body}")
    
    task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")

@tasks_fn.on_task_dispatched(
        retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=1200),
        rate_limits=RateLimits(max_concurrent_dispatches=1), 
        memory=MemoryOption.GB_4, 
        timeout_sec=3600, 
        secrets=["OPENAI_KEY"])
def ingestVideo(req: tasks_fn.CallableRequest) -> str:

    youtubeURL = req.data["url"]
    id = req.data["id"]  # Changed to avoid overwriting the original UUID
    authorID = req.data["authorID"]  # Changed to avoid overwriting the original UUID

    logger.info(f"Received body: {req.data}")

    openai_api_key = os.environ.get('OPENAI_KEY')
    client = OpenAI(api_key=openai_api_key)

    logger.info(f"Request: {youtubeURL}")

    if not operation_is_complete(id):
        logger.info(f"Starting")
        subtitles = getSubtitles(youtubeURL)
        logger.info(f"Got Subtitles")
        chunks = chunk_subtitles(subtitles = subtitles)
        logger.info(f"Got Chunks")
        results = analyze_chunks_with_llm(chunks, client)
        logger.info(f"Finished clipping: found {len(results)}")

        # downloaded_video_path = download_youtube_video(link)
        video, audio = download_youtube_video(youtubeURL)
        logger.info("Downloaded")
        combined_buffer_path = combine_video_audio(video, audio, output_path=f"{id}.mp4")
        logger.info("Combined")

    for i in range(len(results)):
        if not operation_is_complete(id):
            clip = results[i]
            id = uuid.uuid4()
            path = f"{id}_clip{i}.mp4"
            logger.info("Uploading Clip")
            trimmed_video_path = trim_video(combined_buffer_path, start_time=timestamp_to_seconds(clip['start']), end_time=timestamp_to_seconds(clip['end']), output_path=f'{path}')
            upload_blob_from_memory(trimmed_video_path, path)
            create_document(id, path, clip, authorID)
            logger.info("Documented Clip")

    db = firestore.client()
    doc_ref = db.collection("processing").document(f"{id}")
    doc = doc_ref.get()
    if doc.exists:
        data = doc.to_dict()
        data['processing'] = False
        doc_ref.update(data)
    
    return "Episode Processing Completed"```
<p>I have written a program that takes in a youtubeURL, parses the transcript into clips, downloads the original video using moviepy, trims it into the clips, then uploads them to Google Storage. I can confirm that the function works locally. However, when I deploy and call the function (using <code>@https_fn.on_request()</code> and <code>@tasks_fn.on_task_dispatched</code>, it works correctly but at a dramatically slower rate, eventually timing out despite having the timeout set to 60 minutes.</p>
<p>Has anyone been able to handle rather large video files using moviepy in google cloud functions?</p>
<pre><code>BACKUP_COUNT = 1 # Define this as per your needs
HOURLY_BATCH_SIZE = 5 # Define this as per your needs
BACKUP_START_DATE = datetime(2023, 1, 1) # Define this as per your needs

@https_fn.on_request()
def queue_video_task(req: https_fn.Request) -> https_fn.Response:
"""Adds backup tasks to a Cloud Tasks queue."""

request_data = req.get_json()
youtubeURL = request_data.get("url")
authorID = request_data.get("authorID")

logger.info(f"Received request for {youtubeURL}")

id = uuid.uuid4()

if not youtubeURL:
return https_fn.Response('Missing URL', status=400)

task_queue = functions.task_queue("ingestVideo") #enqueueing ingestVideo()
target_uri = get_function_url("ingestVideo")

schedule_time = datetime.now()

dispatch_deadline_seconds = 60 * 30 # 60 minutes

doc_ref = firestore.client().collection('processing').document(f"{id}")

doc = doc_ref.get()

if doc.exists:
if doc._data['processing'] == False: # update document in case another function is dispatched
return https_fn.Response('Video already processed', status=202)

data = {
"url" : youtubeURL,
"authorID" : authorID,
"processing" : True
}
doc_ref.set(data)

# backup_date = BACKUP_START_DATE + timedelta(days=i)
body = {
"data": {
"url": youtubeURL,
"authorID" : authorID,
"id": f"{id}"
}
}

task_options = functions.TaskOptions(
schedule_time=schedule_time,
dispatch_deadline_seconds=dispatch_deadline_seconds,
uri=target_uri
)

logger.info("Updated document")
logger.info(f"Sent with body: {body}")

task_queue.enqueue(body, task_options)
return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")

@tasks_fn.on_task_dispatched(
retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=1200),
rate_limits=RateLimits(max_concurrent_dispatches=1),
memory=MemoryOption.GB_4,
timeout_sec=3600,
secrets=["OPENAI_KEY"])
def ingestVideo(req: tasks_fn.CallableRequest) -> str:

youtubeURL = req.data["url"]
id = req.data["id"] # Changed to avoid overwriting the original UUID
authorID = req.data["authorID"] # Changed to avoid overwriting the original UUID

logger.info(f"Received body: {req.data}")

openai_api_key = os.environ.get('OPENAI_KEY')
client = OpenAI(api_key=openai_api_key)

logger.info(f"Request: {youtubeURL}")

if not operation_is_complete(id):
logger.info(f"Starting")
subtitles = getSubtitles(youtubeURL)
logger.info(f"Got Subtitles")
chunks = chunk_subtitles(subtitles = subtitles)
logger.info(f"Got Chunks")
results = analyze_chunks_with_llm(chunks, client)
logger.info(f"Finished clipping: found {len(results)}")

# downloaded_video_path = download_youtube_video(link)
video, audio = download_youtube_video(youtubeURL)
logger.info("Downloaded")
combined_buffer_path = combine_video_audio(video, audio, output_path=f"{id}.mp4")
logger.info("Combined")

for i in range(len(results)):
if not operation_is_complete(id):
clip = results
id = uuid.uuid4()
path = f"{id}_clip{i}.mp4"
logger.info("Uploading Clip")
trimmed_video_path = trim_video(combined_buffer_path, start_time=timestamp_to_seconds(clip['start']), end_time=timestamp_to_seconds(clip['end']), output_path=f'{path}')
upload_blob_from_memory(trimmed_video_path, path)
create_document(id, path, clip, authorID)
logger.info("Documented Clip")

db = firestore.client()
doc_ref = db.collection("processing").document(f"{id}")
doc = doc_ref.get()
if doc.exists:
data = doc.to_dict()
data['processing'] = False
doc_ref.update(data)

return "Episode Processing Completed"```
</code></pre>
 

Latest posts

I
Replies
0
Views
1
impact christian
I
Top