」工欲善其事,必先利其器。「—孔子《論語.錄靈公》
首頁 > 程式設計 > 掌握Python協程:打造強力並發應用的自定義異步工具

掌握Python協程:打造強力並發應用的自定義異步工具

發佈於2025-04-29
瀏覽:907

Master Python Coroutines: Create Custom Async Tools for Powerful Concurrent Apps让我们从基础知识开始。 Coroutines是可以暂停和恢复的特殊功能,可以进行合作多任务处理。它们是Python异步/等待语法的基础。当您定义coroutine时,您本质上是创建一个可以将控制回到事件循环的函数,从而允许其他任务运行。

要创建一个自定义的等待对象,您需要实现

类自定义: def __init __(自我,价值): self.value =值 def __await __(自我): 屈服 返回self.value 异步def use_custom_awaitable(): 结果=等待自定义(42) 打印(结果)#输出:42

这个自定义的类可以与等待关键字一起使用,就像内置的期待类一样。当等待时,它会产生一次控制,然后返回其价值。 但是,如果我们想创建更复杂的异步原语怎么办?让我们看实现自定义信号量。信号量用于控制多个coroutines对共享资源的访问:

class CustomAwaitable:
    def __init__(self, value):
        self.value = value

    def __await__(self):
        yield
        return self.value

async def use_custom_awaitable():
    result = await CustomAwaitable(42)
    print(result)  # Output: 42

此customSemaphore类实现了获取和发布方法,以及async Context Manager协议(

aenter

aexit
)。它最多允许两个Coroutines同时获得信号量。 现在,让我们谈谈创建有效的事件循环。虽然Python的Asyncio提供了强大的事件循环实现,但可能在某些情况下需要自定义。这是自定义事件循环的基本示例:

import asyncio

class CustomSemaphore:
    def __init__(self, value=1):
        self._value = value
        self._waiters = []

    async def acquire(self):
        while self._value 导入时间
从藏品进口Deque

班级CustomeVentloop:
    def __init __(自我):
        self.__ready = deque()
        self._stopping = false

    def call_soon(self,callback, *args):
        self.__ready.append(((回调,args)))

    def run_forever(self):
        虽然不是自我。_STOPPING:
            self._run_once()

    def _run_once(self):
        ntodo = len(self.__ready)
        对于_范围(ntodo):
            回调,args = self._ready.popleft()
            回调(*args)

    def停止(自我):
        self._stopping = true

    def run_until_complete(self,coro):
        def _done_callback(fut):
            self.stop()

        任务= self.create_task(Coro)
        task.add_done_callback(_DONE_CALLBACK)
        self.run_forever()
        返回task.result()

    def create_task(self,coro):
        任务=任务(Coro,Self)
        self.call_soon(task._step)
        返回任务

班级任务:
    def __init __(self,coro,loop):
        self._coro = coro
        self._loop =循环
        self._done = false
        self._result =无
        self._callbacks = []

    def _step(self):
        尝试:
            如果self._done:
                返回
            结果= self._coro.send(无)
            如果Isinstance(结果,睡觉):
                result._task = self
                self._loop.call_soon(result._wake_up)
            别的:
                self._loop.call_soon(self._step)
        除了停止以E:
            self.set_result(e.value)

    def set_result(self,结果):
        self._result =结果
        self._done = true
        对于self._callbacks中的回调:
            self._loop.call_soon(回调,self)

    def add_done_callback(self,回调):
        如果self._done:
            self._loop.call_soon(回调,self)
        否则:
            self._callbacks.append(回调)

    def结果(自我):
        如果不是self._done:
            提高RuntimeRor(“未完成任务”)
        返回self._result

班级睡觉:
    def __init __(自我,持续时间):
        self._duration =持续时间
        self._task =无
        self._start_time = time.time()

    def _wake_up(self):
        如果time.time() -  self._start_time> = self._duration:
            self._task._loop.call_soon(self._task._step)
        别的:
            self._task._loop.call_soon(self._wake_up)

异步def睡眠(持续时间):
    返回睡眠(持续时间)

异步def示例():
    打印(“开始”)
    等待睡眠(1)
    打印(“ 1秒后”)
    等待睡眠(2)
    打印(“再过2秒后”)
    返回“完成”

loop = customeventloop()
结果= loop.run_until_complete(example())
打印(结果)

此自定义事件循环实现了基本功能,例如运行任务,处理Coroutines,甚至简单的睡眠功能。它不像Python的内置事件循环那样丰富,而是演示了核心概念。 编写异步代码的挑战之一是管理任务优先级。虽然Python的Asyncio没有为任务提供内置优先级队列,但我们可以实现自己的工作: 导入asyncio 导入heapq class PriorityEventloop(asyncio.abstracteventloop): def __init __(自我): self.__ready = [] self._stopping = false self._clock = 0 def call_at(self,nes,sallback, *args,context = none): hander = asyncio.handle(回调,args,self,context) heapq.heappush(self._dready,(何时,句柄)) 返回手柄 def call_later(self,delay,pallback, *args,context = none): 返回self.call_at(self._clock延迟,回调, *args,context = context) def call_soon(self,callback, *args,context = none): 返回self.call_at(self._clock,callback, *args,context = context) def时间(自我): 返回self._clock def停止(自我): self._stopping = true def is_running(self): 返回不self._stopping def run_forever(self): self.__ready而不是self._stopping: self._run_once() def _run_once(self): 如果不是self._ready: 返回 何时handle = heapq.heappop(self.__ready) self._clock =何时 hander._run() def create_task(self,coro): 返回asyncio.task(coro,loop = self) def run_until_complete(self,future): asyncio.futures._chain_future(未来,self.create_future()) self.run_forever() 如果不是Future.done(): 提高RuntimeRor(“事件循环在将来完成之前停止。”) 返回future.result() def create_future(self): 返回asyncio.future(loop = self) 异步def low_priority_task(): 打印(“低优先任务开始”) 等待asyncio.sleep(2) 打印(“低优先任务完成”) 异步def high_priority_task(): 打印(“高优先任务开始”) 等待asyncio.sleep(1) 打印(“完成高优先级任务”) 异步def main(): loop = asyncio.get_event_loop() loop.call_later(0.1,loop.create_task,low_priority_task()) loop.call_later(0,loop.create_task,high_priority_task()) 等待ysyncio.sleep(3) asyncio.run(main())

此PriorityEventloop使用堆队列根据其计划的执行时间来管理任务。您可以通过安排不同延迟的任务来分配优先级。

优雅地处理取消是与Coroutines合作的另一个重要方面。这是如何实现可取消任务的示例:
import time
from collections import deque

class CustomEventLoop:
    def __init__(self):
        self._ready = deque()
        self._stopping = False

    def call_soon(self, callback, *args):
        self._ready.append((callback, args))

    def run_forever(self):
        while not self._stopping:
            self._run_once()

    def _run_once(self):
        ntodo = len(self._ready)
        for _ in range(ntodo):
            callback, args = self._ready.popleft()
            callback(*args)

    def stop(self):
        self._stopping = True

    def run_until_complete(self, coro):
        def _done_callback(fut):
            self.stop()

        task = self.create_task(coro)
        task.add_done_callback(_done_callback)
        self.run_forever()
        return task.result()

    def create_task(self, coro):
        task = Task(coro, self)
        self.call_soon(task._step)
        return task

class Task:
    def __init__(self, coro, loop):
        self._coro = coro
        self._loop = loop
        self._done = False
        self._result = None
        self._callbacks = []

    def _step(self):
        try:
            if self._done:
                return
            result = self._coro.send(None)
            if isinstance(result, SleepHandle):
                result._task = self
                self._loop.call_soon(result._wake_up)
            else:
                self._loop.call_soon(self._step)
        except StopIteration as e:
            self.set_result(e.value)

    def set_result(self, result):
        self._result = result
        self._done = True
        for callback in self._callbacks:
            self._loop.call_soon(callback, self)

    def add_done_callback(self, callback):
        if self._done:
            self._loop.call_soon(callback, self)
        else:
            self._callbacks.append(callback)

    def result(self):
        if not self._done:
            raise RuntimeError('Task is not done')
        return self._result

class SleepHandle:
    def __init__(self, duration):
        self._duration = duration
        self._task = None
        self._start_time = time.time()

    def _wake_up(self):
        if time.time() - self._start_time >= self._duration:
            self._task._loop.call_soon(self._task._step)
        else:
            self._task._loop.call_soon(self._wake_up)

async def sleep(duration):
    return SleepHandle(duration)

async def example():
    print("Start")
    await sleep(1)
    print("After 1 second")
    await sleep(2)
    print("After 2 more seconds")
    return "Done"

loop = CustomEventLoop()
result = loop.run_until_complete(example())
print(result)

导入asyncio 异步def cancellable_operation(): 尝试: 打印(“操作开始”) 等待异步。Sleep(5) 打印(“完成操作已完成”) 除了asyncio.cancellederror: 打印(“取消操作”) #执行任何必要的清理 提高#重新打开comcellederror 异步def main(): task = asyncio.create_task(cancellable_operation()) 等待asyncio.sleep(2) task.cancel() 尝试: 等待任务 除了asyncio.cancellederror: 打印(“主:任务已取消”) asyncio.run(main())

在此示例中,Cancellable_operation捕获Cancellederror,执行任何必要的清理,然后重新提高异常。这允许在仍在传播取消状态的同时优雅地处理取消。
让我们探索实现自定义异步迭代器。这些对于创建可以迭代不同步的序列很有用:

import asyncio

class CustomSemaphore:
    def __init__(self, value=1):
        self._value = value
        self._waiters = []

    async def acquire(self):
        while self._value 类Asyncrange:
    def __init __(自我,开始,停止,步骤= 1):
        self.start = start
        self.Stop =停止
        self.step =步骤

    def __aiter __(自我):
        返回自我

    异步def __anext __(self):
        如果self.start> = self.Stop:
            提高停止词
        value = self.start
        self.start = self.Step
        等待asyncio.sleep(0.1)#模拟一些异步工作
        返回值

异步def main():
    异步在异步(0,5)中:
        打印(i)

asyncio.run(main())

这个Asyncrange类实现了异步迭代器协议,允许在async中用于循环。


最后,让我们看实现自定义async上下文管理者。这些对于管理需要异步获得和发布的资源很有用:

import asyncio

class CustomSemaphore:
    def __init__(self, value=1):
        self._value = value
        self._waiters = []

    async def acquire(self):
        while self._value 类Asyncresource:
    异步def __aenter __(self):
        打印(“获取资源”)
        等待Asyncio.sleep(1)#模拟异步获取
        返回自我

    异步def __aexit __(self,exc_type,exc,tb):
        打印(“发布资源”)
        等待asyncio.sleep(1)#模拟异步释放

异步def main():
    异步与asyncresource()作为资源:
        打印(“使用资源”)
        等待asyncio.sleep(1)

asyncio.run(main())

这个asyncresource类实现


方法,允许它与async一起使用,并带有语句。 总之,Python的Coroutine系统为构建自定义异步原语的基础提供了强大的基础。通过了解基本机制和协议,您可以为特定的异步挑战创建量身定制的解决方案,优化复杂的并发场景中的性能,并扩展Python的异步功能。请记住,尽管这些自定义实现非常适合学习和特定用例,但Python的内置异步库是高度优化的,对于大多数情况,应该是您的首选。快乐的编码!

import asyncio

class CustomSemaphore:
    def __init__(self, value=1):
        self._value = value
        self._waiters = []

    async def acquire(self):
        while self._value 
  
  
  我们的创作

一定要查看我们的创作:


投资者central

|
class AsyncResource:
    async def __aenter__(self):
        print("Acquiring resource")
        await asyncio.sleep(1)  # Simulate async acquisition
        return self

    async def __aexit__(self, exc_type, exc, tb):
        print("Releasing resource")
        await asyncio.sleep(1)  # Simulate async release

async def main():
    async with AsyncResource() as resource:
        print("Using resource")
        await asyncio.sleep(1)

asyncio.run(main())
| [2

令人困惑的神秘 | Hindutva | 精英开发

|


我们在中等

[2 [2

投资者中央媒介

|

令人困惑的神秘媒介

| | [2

版本聲明 本文轉載於:https://dev.to/aaravjoshi/master-python-coroutines-create-custom-async-tools-for-powerful-concurrent-apps-1dpc?1如有侵犯,請聯繫[email protected]刪除
最新教學 更多>

免責聲明: 提供的所有資源部分來自互聯網,如果有侵犯您的版權或其他權益,請說明詳細緣由並提供版權或權益證明然後發到郵箱:[email protected] 我們會在第一時間內為您處理。

Copyright© 2022 湘ICP备2022001581号-3