」工欲善其事,必先利其器。「—孔子《論語.錄靈公》
首頁 > 程式設計 > 使用 Asyncio 建立和管理任務

使用 Asyncio 建立和管理任務

發佈於2024-08-18
瀏覽:708

Creating and Managing Tasks with Asyncio

Asyncio allows developers to write asynchronous programs in Python without hassle. The module also provides many ways asynchronous tasks and with the multitude of ways to do it, it can become confusing on which one to use.

In this article, we will discuss the many ways you can create and manage tasks with asyncio.

What is an asyncio task?

In asyncio, a task is an object that wraps a coroutine and schedules it to run within the event loop. Simply put, a task is a way to run a coroutine concurrently with other tasks. Once a task is created, the event loop runs it, pausing and resuming it as necessary to allow other tasks to run.

Methods for Creating and Managing Asyncio Tasks

Now, we can discuss methods for creating and managing tasks. First, to create a task in Python using asyncio, you use the asyncio.create_task method which takes the following arguments:

  • coro (required): The coroutine object to be scheduled. This is the function you want to run asynchronously.

  • name (optional): A name for the task that can be useful for debugging or logging purposes. You can assign a string to this parameter.

    • You can also set or get the name later using Task.set_name(name) and Task.get_name().
  • context (optional): Introduced in Python 3.11, this is used to set a context variable for the task, enabling task-local storage. It’s similar to thread-local storage but for asyncio tasks.

    • This argument is not commonly used unless you're dealing with advanced scenarios that require context management.

Here is an example of the usage of asyncio.create_task:

import asyncio

# Define a coroutine
async def greet(name):
    await asyncio.sleep(1)  # Simulate an I/O-bound operation
    print(f"Hello, {name}!")

async def main():
    # Create tasks
    task1 = asyncio.create_task(greet("Alice"), name="GreetingAlice")
    task2 = asyncio.create_task(greet("Bob"), name="GreetingBob")

    # Check task names
    print(f"Task 1 name: {task1.get_name()}")
    print(f"Task 2 name: {task2.get_name()}")

    # Wait for both tasks to complete
    await task1
    await task2

# Run the main function
asyncio.run(main())

When you create a task, you can execute many methods such as:

  • .cancel(): to cancel the task.

  • .add_done_callback(cb): to add a callback function that runs when the task is done.

  • .done(): to check if the task is completed.

  • .result(): to retrieve the result of the task after it’s completed.

Now that we understand how to create a task, let's see how to handle waiting for one task or a multitude of tasks.

Waiting for Tasks completion

In this section, we will discuss how to wait for a task completion, for one or many tasks. Asynchronous programming is based on the fact that we can continue the execution of a program if we have an asynchronous task running. There might be times when you want to control better the flow and want to ensure that you have a result that you can work with before safely continuing the execution of the program.

To wait for a single task completion, you can use asyncio.wait_for. It takes two arguments:

  • awaitable (required): This is the coroutine, task, or future that you want to wait for. It can be any object that can be awaited, like a coroutine function call, an asyncio.Task, or an asyncio.Future.

  • timeout (optional): This specifies the maximum number of seconds to wait for the aw to complete. If the timeout is reached and the awaitable has not completed, asyncio.wait_for raises a TimeoutError. If timeout is set to None, the function will wait indefinitely for the awaitable to complete.

Here is an example where this method is used:

import asyncio

async def slow_task():
    print("Task started...")
    await asyncio.sleep(5)  # Simulating a long-running task
    print("Task finished!")
    return "Completed"

async def main():
    try:
        # Wait for slow_task to finish within 2 seconds
        result = await asyncio.wait_for(slow_task(), timeout=2)
        print(result)
    except asyncio.TimeoutError:
        print("The task took too long and was canceled!")

asyncio.run(main())

In the code above, slow_task() is a coroutine that simulates a long-running task by sleeping for 5 seconds. The line asyncio.wait_for(slow_task(), timeout=2) waits for the task to complete but limits the wait to 2 seconds, causing a timeout since the task takes longer. When the timeout is exceeded, a TimeoutError is raised, the task is canceled, and the exception is handled by printing a message indicating the task took too long.

We can also wait for multiple or a group of tasks to complete. This is possible using asyncio.wait, asyncio.gather or asyncio.as_completed. Let's explore each method.

asyncio.wait

The asyncio.wait method waits for a collection of tasks and returns two sets: one for completed tasks and one for pending tasks. It takes the following arguments:

  • aws (required, iterable of awaitables): A collection of coroutine objects, tasks, or futures that you want to wait for.

  • timeout (float or None, optional): The maximum number of seconds to wait. If not provided, it waits indefinitely.

  • return_when (constant, optional): Specifies when asyncio.wait should return. Options include:

    • asyncio.ALL_COMPLETED (default): Returns when all tasks are complete.
    • asyncio.FIRST_COMPLETED: Returns when the first task is completed.
    • asyncio.FIRST_EXCEPTION: Returns when the first task raises an exception.

Let's see how it is used in an example.

import asyncio
import random

async def task():
    await asyncio.sleep(random.uniform(1, 3))

async def main():
    tasks = [asyncio.create_task(task()) for _ in range(3)]
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    print(f"Done tasks: {len(done)}, Pending tasks: {len(pending)}")

asyncio.run(main())

In the code above, asyncio.wait waits for a group of tasks and returns two sets: one with completed tasks and another with those still pending. You can control when it returns, such as after the first task is completed or after all tasks are done. In the example, asyncio.wait returns when the first task is completed, leaving the rest in the pending set.

asyncio.gather

The asyncio.gather method runs multiple awaitable objects concurrently and returns a list of their results, optionally handling exceptions. Let's see the arguments it takes.

  • *aws (required, multiple awaitables): A variable number of awaitable objects (like coroutines, tasks, or futures) to run concurrently.

  • return_exceptions (bool, optional): If True, exceptions in the tasks will be returned as part of the results list instead of being raised.

Let's see how it can be used in an example.

import asyncio
import random

async def task(id):
    await asyncio.sleep(random.uniform(1, 3))
    return f"Task {id} done"

async def main():
    results = await asyncio.gather(task(1), task(2), task(3))
    print(results)

asyncio.run(main())

In the code above, asyncio.gather runs multiple awaitable objects concurrently and returns a list of their results in the order they were passed in. It allows you to handle exceptions gracefully if return_exceptions is set to True. In the example, three tasks are run simultaneously, and their results are returned in a list once all tasks are complete.

asyncio.as_completed

The asyncio.as_completed method is used to return an iterator that yields tasks as they are completed, allowing results to be processed immediately. It takes the following arguments:

  • aws (iterable of awaitables): A collection of coroutine objects, tasks, or futures.

  • timeout (float or None, optional): The maximum number of seconds to wait for tasks to complete. If not provided, it waits indefinitely.

Example

import asyncio
import random

async def task(id):
    await asyncio.sleep(random.uniform(1, 3))
    return f"Task {id} done"

async def main():
    tasks = [task(i) for i in range(3)]
    for coro in asyncio.as_completed(tasks):
        result = await coro
        print(result)

asyncio.run(main())

In the example above, asyncio.as_completed returns an iterator that yields results as each task completes, allowing you to process them immediately. This is useful when you want to handle results as soon as they're available, rather than waiting for all tasks to finish. In the example, the tasks are run simultaneously, and their results are printed as each one finishes, in the order they complete.

So to make a summary, you use:

  • asyncio.wait: when you need to handle multiple tasks and want to track which tasks are completed and which are still pending. It's useful when you care about the status of each task separately.

  • asyncio.gather: when you want to run multiple tasks concurrently and need the results in a list, especially when the order of results matters or you need to handle exceptions gracefully.

  • asyncio.as_completed: when you want to process results as soon as each task finishes, rather than waiting for all tasks to complete. It’s useful for handling results in the order they become available.

However, these methods don't take atomic task management with built-in error handling. In the next section, we will see about asyncio.TaskGroup and how to use it to manage a group of tasks.

asyncio.TaskGroup

asyncio.TaskGroup is a context manager introduced in Python 3.11 that simplifies managing multiple tasks as a group. It ensures that if any task within the group fails, all other tasks are canceled, providing a way to handle complex task management with robust error handling. The class has one method called created_task used to create and add tasks to the task group. You pass a coroutine to this method, and it returns an asyncio.Task object that is managed by the group.

Here is an example of how it is used:

import asyncio

async def task1():
    await asyncio.sleep(1)
    return "Task 1 done"

async def task2():
    await asyncio.sleep(2)
    return "Task 2 done"

async def task_with_error():
    await asyncio.sleep(1)
    raise ValueError("An error occurred")

async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            task1 = tg.create_task(task1())
            task2 = tg.create_task(task2())
            error_task = tg.create_task(task_with_error())
    except Exception as e:
        print(f"Error: {e}")

    # Print results from completed tasks
    print("Task 1 result:", task1.result())
    print("Task 2 result:", task2.result())

asyncio.run(main())

asyncio.TaskGroup manages multiple tasks and ensures that if any task fails, all other tasks in the group are canceled. In the example, a task with an error causes the entire group to be canceled, and only the results of completed tasks are printed.

Usage for this can be in web scraping. You can use asyncio.TaskGroup to handle multiple concurrent API requests and ensure that if any request fails, all other requests are canceled to avoid incomplete data.

We are at the end of the article and we have learned the multiple methods asyncio provides to create and manage tasks. Here is a summary of the methods:

  • asyncio.wait_for: Wait for a task with a timeout.

  • asyncio.wait: Wait for multiple tasks with flexible completion conditions.

  • asyncio.gather: Aggregate multiple tasks into a single awaitable.

  • asyncio.as_completed: Handle tasks as they are completed.

  • asyncio.TaskGroup: Manage a group of tasks with automatic cancellation on failure.

Conclusion

Asynchronous programming can transform the way you handle concurrent tasks in Python, making your code more efficient and responsive. In this article, we've navigated through the various methods provided by asyncio to create and manage tasks, from simple timeouts to sophisticated task groups. Understanding when and how to use each method—asyncio.wait_for, asyncio.wait, asyncio.gather, asyncio.as_completed, and asyncio.TaskGroup—will help you harness the full potential of asynchronous programming, making your applications more robust and scalable.

For a deeper dive into asynchronous programming and more practical examples, explore our detailed guide here.

If you enjoyed this article, consider subscribing to my newsletter so you don't miss out on future updates.

Happy coding!

版本聲明 本文轉載於:https://dev.to/koladev/creating-and-managing-tasks-with-asyncio-4kjl?1如有侵犯,請聯絡[email protected]刪除
最新教學 更多>
  • Javascript 很難(有悲傷)
    Javascript 很難(有悲傷)
    这将是一个很长的阅读,但让我再说一遍。 JAVASCRIPT很难。上次我们见面时,我正在踏入 Javascript 的世界,一个眼睛明亮、充满希望的程序员踏入野生丛林,说“这能有多难?”。我错得有多离谱??事情变得更难了,我(勉强)活了下来,这是关于我的旅程的一个小混乱的故事。 变量:疯狂的开始 ...
    程式設計 發佈於2024-11-06
  • ## 您可以在不使用 JavaScript 的情況下使用 CSS 建立餅圖分段嗎?
    ## 您可以在不使用 JavaScript 的情況下使用 CSS 建立餅圖分段嗎?
    使用 CSS 在圓中分段使用 border-radius 在 CSS 中建立圓是一種常見的做法。但是,我們可以透過分段(類似餅圖)來實現類似的效果嗎?本文深入研究了僅透過 HTML 和 CSS 實現此目的的方法,不包括使用 JavaScript。 產生相等大小的段相等大小段的一種方法涉及產生以下內容...
    程式設計 發佈於2024-11-06
  • 從頭開始建立一個小型向量存儲
    從頭開始建立一個小型向量存儲
    With the evolving landscape of generative AI, vector databases are playing crucial role in powering generative AI applications. There are so many vect...
    程式設計 發佈於2024-11-06
  • 如何在Chrome使用AI實驗API
    如何在Chrome使用AI實驗API
    要在 Chrome 中使用實驗性 AI API,請依照下列步驟操作: 硬體需求 4GB 記憶體 GPU可用 至少 22GB 空間 Windows 10.11 或 macOS Ventura 或更新版本(無 Linux 規格) 尚不支持: Chrome作業系統 Chrome iOS C...
    程式設計 發佈於2024-11-06
  • 評論:Adam Johnson 的《Boost Your Django DX》
    評論:Adam Johnson 的《Boost Your Django DX》
    書評很微妙。您不想破壞它,但您也想讓潛在讀者體驗所期待的內容。這是提供背景和保持興趣之間的巧妙平衡。我試圖在這篇評論中達到這種平衡,為您提供足夠的內容來吸引您,而不透露太多。 一個小背景故事:我第一次從 Djangonaut Space 的好朋友 Tim 那裡聽說這本書,並將其添加到我的閱讀清單中...
    程式設計 發佈於2024-11-06
  • 如何將陣列元素分組並組合多維數組中另一列的值?
    如何將陣列元素分組並組合多維數組中另一列的值?
    按列將數組元素分組並組合另一列中的值給定一個包含兩列嵌套數組的數組,任務是將基於特定列的子數組,並將每個組中另一列的值連接起來,產生以逗號分隔的清單。 考慮以下範例陣列:$array = [ ["444", "0081"], ["44...
    程式設計 發佈於2024-11-06
  • 三個新加入的例外功能
    三個新加入的例外功能
    從 JDK 7 開始,異常處理已擴展為三個新功能:自動資源管理、多重捕獲和更準確的重新拋出。 多重catch可讓您使用同一個catch子句擷取多個異常,避免程式碼重複。 要使用多重捕獲,請指定由 | 分隔的異常清單。在 catch 子句中。每個參數都是隱式最終參數。 用法範例:catch(f...
    程式設計 發佈於2024-11-06
  • 如何修復執行 ES6 程式碼時出現「意外的令牌匯出」錯誤?
    如何修復執行 ES6 程式碼時出現「意外的令牌匯出」錯誤?
    「排除意外的令牌匯出錯誤」嘗試在專案中執行ES6 程式碼時,可能會出現「意外的令牌導出”錯誤。此錯誤表示所使用的環境不支援 ES6 模組中使用的匯出關鍵字語法。 錯誤詳細資料以下程式碼片段舉例說明了錯誤的來源: export class MyClass { constructor() { ...
    程式設計 發佈於2024-11-06
  • 即使卸載後,VSCode 擴充功能也不會從檔案系統中刪除,我建立了一個解決方案!
    即使卸載後,VSCode 擴充功能也不會從檔案系統中刪除,我建立了一個解決方案!
    所以這是基於 vscode 的編輯器的問題。即使您卸載了擴充功能,它也會保留在檔案系統中,並隨著時間的推移堵塞您的系統。我創建了一個簡單的解決方案。執行此 python 腳本將刪除 vscode 上未安裝的擴充功能。 它適用於 VS Code、VS Code Insiders,也適用於 VSCod...
    程式設計 發佈於2024-11-06
  • 透過 GitHub Actions 按計畫更新網站內容
    透過 GitHub Actions 按計畫更新網站內容
    我想分享我建立一個自我永續的內容管理系統的旅程,該系統不需要傳統意義上的內容資料庫。 問題 該網站的內容(部落格文章和書籤)儲存在 Notion 資料庫中: 附書籤的資料庫 –  Notion UI 我試圖解決的問題是不必在添加每個書籤後手動部署網站。最重要的是 - 保持託管盡可能...
    程式設計 發佈於2024-11-06
  • 如何在 Laravel 5 應用程式的共享託管環境中清除快取?
    如何在 Laravel 5 應用程式的共享託管環境中清除快取?
    如何從 Laravel 5 中的共享託管伺服器清除快取清除快取對於維護 Laravel 應用程式的效能和效率至關重要。但是,在您可能無法存取 CLI 的共享託管環境中,清除快取可能是一個挑戰。 清除視圖快取的解決方法在這種情況下,您可以透過在 CLI 之外呼叫 Artisan 命令來解決此問題。要清...
    程式設計 發佈於2024-11-06
  • 如何加速 Matplotlib 繪圖以提高效能?
    如何加速 Matplotlib 繪圖以提高效能?
    為什麼 Matplotlib 這麼慢? 在評估 Python 繪圖庫時,考慮效能很重要。 Matplotlib 是一個廣泛使用的函式庫,它看起來可能很緩慢,引發了關於加快速度或探索替代選項的問題。讓我們深入研究這個問題並探索可能的解決方案。 提供的範例展示了具有多個子圖和資料更新的圖。使用 Matp...
    程式設計 發佈於2024-11-06
  • 使用畫布調整影像大小時如何克服鋸齒狀邊緣和模糊結果?
    使用畫布調整影像大小時如何克服鋸齒狀邊緣和模糊結果?
    解決在JavaScript 中使用Canvas 調整影像大小時的平滑問題在JavaScript 中使用Canvas 調整影像大小有時會導致明顯的鋸齒狀邊緣或模糊。為了實現平滑的調整大小,可以採用一種稱為向下步進的技術。 在大多數瀏覽器中,預設使用線性內插法來調整大小。雙三次插值可產生更平滑的結果,涉...
    程式設計 發佈於2024-11-06
  • 如何解決 MySQL C# 中的文字編碼問題?
    如何解決 MySQL C# 中的文字編碼問題?
    修復MySQL C# 中的文字編碼問題使用實體框架在C# 中處理MySQL 資料庫時,使用者可能會遇到文字編碼問題,特別是帶有特殊字符,例如“ë”,渲染不正確。本文探討了解決此常見問題的最合適的解決方案。 要修正編碼問題,必須執行以下操作:驗證排序規則設定: 確保所涉及的資料庫或表的排序規則與UTF...
    程式設計 發佈於2024-11-06
  • 如何將美麗搜尋與 Node.js 集成
    如何將美麗搜尋與 Node.js 集成
    作為 Node.js 開發人員,建立能夠提供快速且準確的搜尋結果的應用程式非常重要。使用者期望立即得到相關的回應,但實現起來可能具有挑戰性,特別是在處理大型資料集時。 這就是美麗搜尋的用武之地——一個為輕鬆滿足這些需求而構建的搜尋引擎。 什麼是美麗搜尋? Meilisearch ...
    程式設計 發佈於2024-11-06

免責聲明: 提供的所有資源部分來自互聯網,如果有侵犯您的版權或其他權益,請說明詳細緣由並提供版權或權益證明然後發到郵箱:[email protected] 我們會在第一時間內為您處理。

Copyright© 2022 湘ICP备2022001581号-3