coroutines في Python هي أداة قوية لكتابة الكود غير المتزامن. لقد أحدثوا ثورة في كيفية التعامل مع العمليات المتزامنة ، مما يسهل بناء تطبيقات قابلة للتطوير وفعالة. لقد أمضيت الكثير من الوقت في العمل مع Coroutines ، وأنا متحمس لمشاركة بعض الأفكار حول إنشاء بدائل غير متزامنة مخصصة.
لنبدأ بالأساسيات. Coroutines هي وظائف خاصة يمكن إيقافها وإعادة تأمينها ، مما يسمح بمهام متعددة التعاون. إنها أساس بناء جملة Async/Await's Python. عندما تحدد coroutine ، فأنت تنشئ بشكل أساسي وظيفة يمكن أن تعطي التحكم في حلقة الحدث ، مما يسمح للمهام الأخرى بالتشغيل.
لإنشاء كائن قابل للانتظار مخصص ، تحتاج إلى تنفيذ طريقة في انتظار . يجب أن تعيد هذه الطريقة مؤلف. إليك مثال بسيط:
class CustomAwaitable: def __init__(self, value): self.value = value def __await__(self): yield return self.value async def use_custom_awaitable(): result = await CustomAwaitable(42) print(result) # Output: 42
يمكن استخدام هذه الفئة المخصصة مع الكلمة الرئيسية في انتظار ، تمامًا مثل Awaitables المدمجة. عندما تنتظر ، فإنه يعطي التحكم مرة واحدة ، ثم يعيد قيمته.
ولكن ماذا لو كنا نريد إنشاء بدايات غير متزامنة أكثر تعقيدًا؟ دعونا نلقي نظرة على تنفيذ إشارة مخصصة. تُستخدم Semaphores للتحكم في الوصول إلى مورد مشترك بواسطة Coroutines متعددة:
import asyncio class CustomSemaphore: def __init__(self, value=1): self._value = value self._waiters = [] async def acquire(self): while self._valueتنفذ فئة CustomSemaphore هذه أساليب الاستحواذ والإفراج ، بالإضافة إلى بروتوكول مدير سياق ASYNC ( AENTER و aexit ). إنها تسمح بحد أقصى اثنين من coroutines بالحصول على semaphore في وقت واحد.
الآن ، دعنا نتحدث عن إنشاء حلقات أحداث فعالة. في حين أن Python's Asyncio يوفر تطبيقًا قويًا للحدث ، فقد تكون هناك حالات تحتاج فيها إلى حالة مخصصة. إليك مثال أساسي على حلقة حدث مخصصة:
import time from collections import deque class CustomEventLoop: def __init__(self): self._ready = deque() self._stopping = False def call_soon(self, callback, *args): self._ready.append((callback, args)) def run_forever(self): while not self._stopping: self._run_once() def _run_once(self): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() callback(*args) def stop(self): self._stopping = True def run_until_complete(self, coro): def _done_callback(fut): self.stop() task = self.create_task(coro) task.add_done_callback(_done_callback) self.run_forever() return task.result() def create_task(self, coro): task = Task(coro, self) self.call_soon(task._step) return task class Task: def __init__(self, coro, loop): self._coro = coro self._loop = loop self._done = False self._result = None self._callbacks = [] def _step(self): try: if self._done: return result = self._coro.send(None) if isinstance(result, SleepHandle): result._task = self self._loop.call_soon(result._wake_up) else: self._loop.call_soon(self._step) except StopIteration as e: self.set_result(e.value) def set_result(self, result): self._result = result self._done = True for callback in self._callbacks: self._loop.call_soon(callback, self) def add_done_callback(self, callback): if self._done: self._loop.call_soon(callback, self) else: self._callbacks.append(callback) def result(self): if not self._done: raise RuntimeError('Task is not done') return self._result class SleepHandle: def __init__(self, duration): self._duration = duration self._task = None self._start_time = time.time() def _wake_up(self): if time.time() - self._start_time >= self._duration: self._task._loop.call_soon(self._task._step) else: self._task._loop.call_soon(self._wake_up) async def sleep(duration): return SleepHandle(duration) async def example(): print("Start") await sleep(1) print("After 1 second") await sleep(2) print("After 2 more seconds") return "Done" loop = CustomEventLoop() result = loop.run_until_complete(example()) print(result)تنفذ حلقة الحدث المخصصة هذه الوظائف الأساسية مثل تشغيل المهام ، والتعامل مع coroutines ، وحتى وظيفة النوم البسيطة. إنها ليست غنية بالميزات مثل حلقة الأحداث المدمجة في Python ، ولكنها توضح المفاهيم الأساسية.
أحد التحديات في كتابة الكود غير المتزامن هو إدارة أول أولويات المهمة. على الرغم من أن Asyncio من Python لا يوفر قوائم أولوية مدمجة للمهام ، يمكننا تنفيذنا:
import asyncio import heapq class PriorityEventLoop(asyncio.AbstractEventLoop): def __init__(self): self._ready = [] self._stopping = False self._clock = 0 def call_at(self, when, callback, *args, context=None): handle = asyncio.Handle(callback, args, self, context) heapq.heappush(self._ready, (when, handle)) return handle def call_later(self, delay, callback, *args, context=None): return self.call_at(self._clock delay, callback, *args, context=context) def call_soon(self, callback, *args, context=None): return self.call_at(self._clock, callback, *args, context=context) def time(self): return self._clock def stop(self): self._stopping = True def is_running(self): return not self._stopping def run_forever(self): while self._ready and not self._stopping: self._run_once() def _run_once(self): if not self._ready: return when, handle = heapq.heappop(self._ready) self._clock = when handle._run() def create_task(self, coro): return asyncio.Task(coro, loop=self) def run_until_complete(self, future): asyncio.futures._chain_future(future, self.create_future()) self.run_forever() if not future.done(): raise RuntimeError('Event loop stopped before Future completed.') return future.result() def create_future(self): return asyncio.Future(loop=self) async def low_priority_task(): print("Low priority task started") await asyncio.sleep(2) print("Low priority task finished") async def high_priority_task(): print("High priority task started") await asyncio.sleep(1) print("High priority task finished") async def main(): loop = asyncio.get_event_loop() loop.call_later(0.1, loop.create_task, low_priority_task()) loop.call_later(0, loop.create_task, high_priority_task()) await asyncio.sleep(3) asyncio.run(main())يستخدم هذا priorityeventloop قائمة انتظار كومة لإدارة المهام بناءً على وقت التنفيذ المجدول. يمكنك تعيين الأولويات من خلال جدولة المهام مع تأخيرات مختلفة.
التعامل مع الإلغاء بأمان هو جانب آخر مهم في العمل مع coroutines. إليك مثال على كيفية تنفيذ المهام القابلة للإلغاء:
import asyncio async def cancellable_operation(): try: print("Operation started") await asyncio.sleep(5) print("Operation completed") except asyncio.CancelledError: print("Operation was cancelled") # Perform any necessary cleanup raise # Re-raise the CancelledError async def main(): task = asyncio.create_task(cancellable_operation()) await asyncio.sleep(2) task.cancel() try: await task except asyncio.CancelledError: print("Main: task was cancelled") asyncio.run(main())في هذا المثال ، يمسك cancellable_operation بـ cancellederror ، ويؤدي أي تنظيف ضروري ، ثم يعيد إعادة الاستثناء. هذا يسمح بالتعامل الرشيق للإلغاء مع استمرار نشر حالة الإلغاء.
دعنا نستكشف تنفيذ تكرار ASYNC المخصص. هذه مفيدة لإنشاء تسلسلات يمكن تكرارها بشكل غير متزامن:
class AsyncRange: def __init__(self, start, stop, step=1): self.start = start self.stop = stop self.step = step def __aiter__(self): return self async def __anext__(self): if self.start >= self.stop: raise StopAsyncIteration value = self.start self.start = self.step await asyncio.sleep(0.1) # Simulate some async work return value async def main(): async for i in AsyncRange(0, 5): print(i) asyncio.run(main())هذه الفئة غير المتزامنة تنفذ بروتوكول ITERATOR ASYNC ، مما يسمح باستخدامه في ASYNC للحلقات.
أخيرًا ، دعونا نلقي نظرة على تنفيذ مديري سياق ASYNC المخصصين. هذه مفيدة لإدارة الموارد التي تحتاج إلى الحصول عليها وإصدارها بشكل غير متزامن:
class AsyncResource: async def __aenter__(self): print("Acquiring resource") await asyncio.sleep(1) # Simulate async acquisition return self async def __aexit__(self, exc_type, exc, tb): print("Releasing resource") await asyncio.sleep(1) # Simulate async release async def main(): async with AsyncResource() as resource: print("Using resource") await asyncio.sleep(1) asyncio.run(main())هذه فئة Asyncresource تنفذ الأساليب AENTER و AEXIT ، مما يسمح باستخدامه مع ASYNC مع العبارة.
في الختام ، يوفر نظام Python's Coroutine أساسًا قويًا لبناء بدائل غير متزامنة مخصصة. من خلال فهم الآليات والبروتوكولات الأساسية ، يمكنك إنشاء حلول مصممة لتحديات غير متزامنة محددة ، وتحسين الأداء في السيناريوهات المتزامنة المعقدة ، وتوسيع قدرات Python Async. تذكر أنه على الرغم من أن هذه التطبيقات المخصصة رائعة للتعلم وحالات الاستخدام المحددة ، إلا أن مكتبة Asyncio المدمجة في Python محسّنة للغاية ويجب أن تكون الخاصة بك في معظم السيناريوهات. ترميز سعيد!
إبداعاتنا
تأكد من مراجعة إبداعاتنا:
Investor Central | Smart Living | epochs & أصداء | أسرار محير | Hindutva | Elite dev | مدارس JS
نحن على متوسط
Tech Koala Insights | epochs & أصداء العالم | المستثمر المركزي المتوسط | أسرار محيرة متوسطة | Science & Epochs Medium | Modern Hindutva
تنصل: جميع الموارد المقدمة هي جزئيًا من الإنترنت. إذا كان هناك أي انتهاك لحقوق الطبع والنشر الخاصة بك أو الحقوق والمصالح الأخرى، فيرجى توضيح الأسباب التفصيلية وتقديم دليل على حقوق الطبع والنشر أو الحقوق والمصالح ثم إرسالها إلى البريد الإلكتروني: [email protected]. سوف نتعامل مع الأمر لك في أقرب وقت ممكن.
Copyright© 2022 湘ICP备2022001581号-3