让我们从基础知识开始。 Coroutines是可以暂停和恢复的特殊功能,可以进行合作多任务处理。它们是Python异步/等待语法的基础。当您定义coroutine时,您本质上是创建一个可以将控制回到事件循环的函数,从而允许其他任务运行。
类自定义: def __init __(自我,价值): self.value =值 def __await __(自我): 屈服 返回self.value 异步def use_custom_awaitable(): 结果=等待自定义(42) 打印(结果)#输出:42
这个自定义的类可以与等待关键字一起使用,就像内置的期待类一样。当等待时,它会产生一次控制,然后返回其价值。
但是,如果我们想创建更复杂的异步原语怎么办?让我们看实现自定义信号量。信号量用于控制多个coroutines对共享资源的访问:
class CustomAwaitable: def __init__(self, value): self.value = value def __await__(self): yield return self.value async def use_custom_awaitable(): result = await CustomAwaitable(42) print(result) # Output: 42
此customSemaphore类实现了获取和发布方法,以及async Context Manager协议(
aenter aexit
)。它最多允许两个Coroutines同时获得信号量。
现在,让我们谈谈创建有效的事件循环。虽然Python的Asyncio提供了强大的事件循环实现,但可能在某些情况下需要自定义。这是自定义事件循环的基本示例:
import asyncio class CustomSemaphore: def __init__(self, value=1): self._value = value self._waiters = [] async def acquire(self): while self._value 导入时间 从藏品进口Deque 班级CustomeVentloop: def __init __(自我): self.__ready = deque() self._stopping = false def call_soon(self,callback, *args): self.__ready.append(((回调,args))) def run_forever(self): 虽然不是自我。_STOPPING: self._run_once() def _run_once(self): ntodo = len(self.__ready) 对于_范围(ntodo): 回调,args = self._ready.popleft() 回调(*args) def停止(自我): self._stopping = true def run_until_complete(self,coro): def _done_callback(fut): self.stop() 任务= self.create_task(Coro) task.add_done_callback(_DONE_CALLBACK) self.run_forever() 返回task.result() def create_task(self,coro): 任务=任务(Coro,Self) self.call_soon(task._step) 返回任务 班级任务: def __init __(self,coro,loop): self._coro = coro self._loop =循环 self._done = false self._result =无 self._callbacks = [] def _step(self): 尝试: 如果self._done: 返回 结果= self._coro.send(无) 如果Isinstance(结果,睡觉): result._task = self self._loop.call_soon(result._wake_up) 别的: self._loop.call_soon(self._step) 除了停止以E: self.set_result(e.value) def set_result(self,结果): self._result =结果 self._done = true 对于self._callbacks中的回调: self._loop.call_soon(回调,self) def add_done_callback(self,回调): 如果self._done: self._loop.call_soon(回调,self) 否则: self._callbacks.append(回调) def结果(自我): 如果不是self._done: 提高RuntimeRor(“未完成任务”) 返回self._result 班级睡觉: def __init __(自我,持续时间): self._duration =持续时间 self._task =无 self._start_time = time.time() def _wake_up(self): 如果time.time() - self._start_time> = self._duration: self._task._loop.call_soon(self._task._step) 别的: self._task._loop.call_soon(self._wake_up) 异步def睡眠(持续时间): 返回睡眠(持续时间) 异步def示例(): 打印(“开始”) 等待睡眠(1) 打印(“ 1秒后”) 等待睡眠(2) 打印(“再过2秒后”) 返回“完成” loop = customeventloop() 结果= loop.run_until_complete(example()) 打印(结果)此自定义事件循环实现了基本功能,例如运行任务,处理Coroutines,甚至简单的睡眠功能。它不像Python的内置事件循环那样丰富,而是演示了核心概念。 编写异步代码的挑战之一是管理任务优先级。虽然Python的Asyncio没有为任务提供内置优先级队列,但我们可以实现自己的工作: 导入asyncio 导入heapq class PriorityEventloop(asyncio.abstracteventloop): def __init __(自我): self.__ready = [] self._stopping = false self._clock = 0 def call_at(self,nes,sallback, *args,context = none): hander = asyncio.handle(回调,args,self,context) heapq.heappush(self._dready,(何时,句柄)) 返回手柄 def call_later(self,delay,pallback, *args,context = none): 返回self.call_at(self._clock延迟,回调, *args,context = context) def call_soon(self,callback, *args,context = none): 返回self.call_at(self._clock,callback, *args,context = context) def时间(自我): 返回self._clock def停止(自我): self._stopping = true def is_running(self): 返回不self._stopping def run_forever(self): self.__ready而不是self._stopping: self._run_once() def _run_once(self): 如果不是self._ready: 返回 何时handle = heapq.heappop(self.__ready) self._clock =何时 hander._run() def create_task(self,coro): 返回asyncio.task(coro,loop = self) def run_until_complete(self,future): asyncio.futures._chain_future(未来,self.create_future()) self.run_forever() 如果不是Future.done(): 提高RuntimeRor(“事件循环在将来完成之前停止。”) 返回future.result() def create_future(self): 返回asyncio.future(loop = self) 异步def low_priority_task(): 打印(“低优先任务开始”) 等待asyncio.sleep(2) 打印(“低优先任务完成”) 异步def high_priority_task(): 打印(“高优先任务开始”) 等待asyncio.sleep(1) 打印(“完成高优先级任务”) 异步def main(): loop = asyncio.get_event_loop() loop.call_later(0.1,loop.create_task,low_priority_task()) loop.call_later(0,loop.create_task,high_priority_task()) 等待ysyncio.sleep(3) asyncio.run(main())
此PriorityEventloop使用堆队列根据其计划的执行时间来管理任务。您可以通过安排不同延迟的任务来分配优先级。
优雅地处理取消是与Coroutines合作的另一个重要方面。这是如何实现可取消任务的示例:
import time from collections import deque class CustomEventLoop: def __init__(self): self._ready = deque() self._stopping = False def call_soon(self, callback, *args): self._ready.append((callback, args)) def run_forever(self): while not self._stopping: self._run_once() def _run_once(self): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() callback(*args) def stop(self): self._stopping = True def run_until_complete(self, coro): def _done_callback(fut): self.stop() task = self.create_task(coro) task.add_done_callback(_done_callback) self.run_forever() return task.result() def create_task(self, coro): task = Task(coro, self) self.call_soon(task._step) return task class Task: def __init__(self, coro, loop): self._coro = coro self._loop = loop self._done = False self._result = None self._callbacks = [] def _step(self): try: if self._done: return result = self._coro.send(None) if isinstance(result, SleepHandle): result._task = self self._loop.call_soon(result._wake_up) else: self._loop.call_soon(self._step) except StopIteration as e: self.set_result(e.value) def set_result(self, result): self._result = result self._done = True for callback in self._callbacks: self._loop.call_soon(callback, self) def add_done_callback(self, callback): if self._done: self._loop.call_soon(callback, self) else: self._callbacks.append(callback) def result(self): if not self._done: raise RuntimeError('Task is not done') return self._result class SleepHandle: def __init__(self, duration): self._duration = duration self._task = None self._start_time = time.time() def _wake_up(self): if time.time() - self._start_time >= self._duration: self._task._loop.call_soon(self._task._step) else: self._task._loop.call_soon(self._wake_up) async def sleep(duration): return SleepHandle(duration) async def example(): print("Start") await sleep(1) print("After 1 second") await sleep(2) print("After 2 more seconds") return "Done" loop = CustomEventLoop() result = loop.run_until_complete(example()) print(result)导入asyncio 异步def cancellable_operation(): 尝试: 打印(“操作开始”) 等待异步。Sleep(5) 打印(“完成操作已完成”) 除了asyncio.cancellederror: 打印(“取消操作”) #执行任何必要的清理 提高#重新打开comcellederror 异步def main(): task = asyncio.create_task(cancellable_operation()) 等待asyncio.sleep(2) task.cancel() 尝试: 等待任务 除了asyncio.cancellederror: 打印(“主:任务已取消”) asyncio.run(main())
在此示例中,Cancellable_operation捕获Cancellederror,执行任何必要的清理,然后重新提高异常。这允许在仍在传播取消状态的同时优雅地处理取消。
让我们探索实现自定义异步迭代器。这些对于创建可以迭代不同步的序列很有用:import asyncio class CustomSemaphore: def __init__(self, value=1): self._value = value self._waiters = [] async def acquire(self): while self._value 类Asyncrange: def __init __(自我,开始,停止,步骤= 1): self.start = start self.Stop =停止 self.step =步骤 def __aiter __(自我): 返回自我 异步def __anext __(self): 如果self.start> = self.Stop: 提高停止词 value = self.start self.start = self.Step 等待asyncio.sleep(0.1)#模拟一些异步工作 返回值 异步def main(): 异步在异步(0,5)中: 打印(i) asyncio.run(main())这个Asyncrange类实现了异步迭代器协议,允许在async中用于循环。
最后,让我们看实现自定义async上下文管理者。这些对于管理需要异步获得和发布的资源很有用:import asyncio class CustomSemaphore: def __init__(self, value=1): self._value = value self._waiters = [] async def acquire(self): while self._value 类Asyncresource: 异步def __aenter __(self): 打印(“获取资源”) 等待Asyncio.sleep(1)#模拟异步获取 返回自我 异步def __aexit __(self,exc_type,exc,tb): 打印(“发布资源”) 等待asyncio.sleep(1)#模拟异步释放 异步def main(): 异步与asyncresource()作为资源: 打印(“使用资源”) 等待asyncio.sleep(1) asyncio.run(main())这个asyncresource类实现
和
方法,允许它与async一起使用,并带有语句。 总之,Python的Coroutine系统为构建自定义异步原语的基础提供了强大的基础。通过了解基本机制和协议,您可以为特定的异步挑战创建量身定制的解决方案,优化复杂的并发场景中的性能,并扩展Python的异步功能。请记住,尽管这些自定义实现非常适合学习和特定用例,但Python的内置异步库是高度优化的,对于大多数情况,应该是您的首选。快乐的编码!import asyncio class CustomSemaphore: def __init__(self, value=1): self._value = value self._waiters = [] async def acquire(self): while self._value 我们的创作一定要查看我们的创作:
|
投资者centralclass AsyncResource: async def __aenter__(self): print("Acquiring resource") await asyncio.sleep(1) # Simulate async acquisition return self async def __aexit__(self, exc_type, exc, tb): print("Releasing resource") await asyncio.sleep(1) # Simulate async release async def main(): async with AsyncResource() as resource: print("Using resource") await asyncio.sleep(1) asyncio.run(main())| [2令人困惑的神秘 | Hindutva | 精英开发
|
我们在中等[2 [2
投资者中央媒介|
令人困惑的神秘媒介| | [2
免責聲明: 提供的所有資源部分來自互聯網,如果有侵犯您的版權或其他權益,請說明詳細緣由並提供版權或權益證明然後發到郵箱:[email protected] 我們會在第一時間內為您處理。
Copyright© 2022 湘ICP备2022001581号-3