"إذا أراد العامل أن يؤدي عمله بشكل جيد، فعليه أولاً أن يشحذ أدواته." - كونفوشيوس، "مختارات كونفوشيوس. لو لينجونج"
الصفحة الأمامية > برمجة > Master Python Coroutines: إنشاء أداة غير متزامنة مخصصة للتطبيقات المتزامنة القوية

Master Python Coroutines: إنشاء أداة غير متزامنة مخصصة للتطبيقات المتزامنة القوية

نشر في 2025-04-29
تصفح:841

Master Python Coroutines: Create Custom Async Tools for Powerful Concurrent Apps

coroutines في Python هي أداة قوية لكتابة الكود غير المتزامن. لقد أحدثوا ثورة في كيفية التعامل مع العمليات المتزامنة ، مما يسهل بناء تطبيقات قابلة للتطوير وفعالة. لقد أمضيت الكثير من الوقت في العمل مع Coroutines ، وأنا متحمس لمشاركة بعض الأفكار حول إنشاء بدائل غير متزامنة مخصصة.

لنبدأ بالأساسيات. Coroutines هي وظائف خاصة يمكن إيقافها وإعادة تأمينها ، مما يسمح بمهام متعددة التعاون. إنها أساس بناء جملة Async/Await's Python. عندما تحدد coroutine ، فأنت تنشئ بشكل أساسي وظيفة يمكن أن تعطي التحكم في حلقة الحدث ، مما يسمح للمهام الأخرى بالتشغيل.

لإنشاء كائن قابل للانتظار مخصص ، تحتاج إلى تنفيذ طريقة في انتظار . يجب أن تعيد هذه الطريقة مؤلف. إليك مثال بسيط:

class CustomAwaitable:
    def __init__(self, value):
        self.value = value

    def __await__(self):
        yield
        return self.value

async def use_custom_awaitable():
    result = await CustomAwaitable(42)
    print(result)  # Output: 42

يمكن استخدام هذه الفئة المخصصة مع الكلمة الرئيسية في انتظار ، تمامًا مثل Awaitables المدمجة. عندما تنتظر ، فإنه يعطي التحكم مرة واحدة ، ثم يعيد قيمته.

ولكن ماذا لو كنا نريد إنشاء بدايات غير متزامنة أكثر تعقيدًا؟ دعونا نلقي نظرة على تنفيذ إشارة مخصصة. تُستخدم Semaphores للتحكم في الوصول إلى مورد مشترك بواسطة Coroutines متعددة:

import asyncio

class CustomSemaphore:
    def __init__(self, value=1):
        self._value = value
        self._waiters = []

    async def acquire(self):
        while self._value 



تنفذ فئة CustomSemaphore هذه أساليب الاستحواذ والإفراج ، بالإضافة إلى بروتوكول مدير سياق ASYNC ( AENTER و aexit ). إنها تسمح بحد أقصى اثنين من coroutines بالحصول على semaphore في وقت واحد.

الآن ، دعنا نتحدث عن إنشاء حلقات أحداث فعالة. في حين أن Python's Asyncio يوفر تطبيقًا قويًا للحدث ، فقد تكون هناك حالات تحتاج فيها إلى حالة مخصصة. إليك مثال أساسي على حلقة حدث مخصصة:

import time
from collections import deque

class CustomEventLoop:
    def __init__(self):
        self._ready = deque()
        self._stopping = False

    def call_soon(self, callback, *args):
        self._ready.append((callback, args))

    def run_forever(self):
        while not self._stopping:
            self._run_once()

    def _run_once(self):
        ntodo = len(self._ready)
        for _ in range(ntodo):
            callback, args = self._ready.popleft()
            callback(*args)

    def stop(self):
        self._stopping = True

    def run_until_complete(self, coro):
        def _done_callback(fut):
            self.stop()

        task = self.create_task(coro)
        task.add_done_callback(_done_callback)
        self.run_forever()
        return task.result()

    def create_task(self, coro):
        task = Task(coro, self)
        self.call_soon(task._step)
        return task

class Task:
    def __init__(self, coro, loop):
        self._coro = coro
        self._loop = loop
        self._done = False
        self._result = None
        self._callbacks = []

    def _step(self):
        try:
            if self._done:
                return
            result = self._coro.send(None)
            if isinstance(result, SleepHandle):
                result._task = self
                self._loop.call_soon(result._wake_up)
            else:
                self._loop.call_soon(self._step)
        except StopIteration as e:
            self.set_result(e.value)

    def set_result(self, result):
        self._result = result
        self._done = True
        for callback in self._callbacks:
            self._loop.call_soon(callback, self)

    def add_done_callback(self, callback):
        if self._done:
            self._loop.call_soon(callback, self)
        else:
            self._callbacks.append(callback)

    def result(self):
        if not self._done:
            raise RuntimeError('Task is not done')
        return self._result

class SleepHandle:
    def __init__(self, duration):
        self._duration = duration
        self._task = None
        self._start_time = time.time()

    def _wake_up(self):
        if time.time() - self._start_time >= self._duration:
            self._task._loop.call_soon(self._task._step)
        else:
            self._task._loop.call_soon(self._wake_up)

async def sleep(duration):
    return SleepHandle(duration)

async def example():
    print("Start")
    await sleep(1)
    print("After 1 second")
    await sleep(2)
    print("After 2 more seconds")
    return "Done"

loop = CustomEventLoop()
result = loop.run_until_complete(example())
print(result)

تنفذ حلقة الحدث المخصصة هذه الوظائف الأساسية مثل تشغيل المهام ، والتعامل مع coroutines ، وحتى وظيفة النوم البسيطة. إنها ليست غنية بالميزات مثل حلقة الأحداث المدمجة في Python ، ولكنها توضح المفاهيم الأساسية.

أحد التحديات في كتابة الكود غير المتزامن هو إدارة أول أولويات المهمة. على الرغم من أن Asyncio من Python لا يوفر قوائم أولوية مدمجة للمهام ، يمكننا تنفيذنا:

import asyncio
import heapq

class PriorityEventLoop(asyncio.AbstractEventLoop):
    def __init__(self):
        self._ready = []
        self._stopping = False
        self._clock = 0

    def call_at(self, when, callback, *args, context=None):
        handle = asyncio.Handle(callback, args, self, context)
        heapq.heappush(self._ready, (when, handle))
        return handle

    def call_later(self, delay, callback, *args, context=None):
        return self.call_at(self._clock   delay, callback, *args, context=context)

    def call_soon(self, callback, *args, context=None):
        return self.call_at(self._clock, callback, *args, context=context)

    def time(self):
        return self._clock

    def stop(self):
        self._stopping = True

    def is_running(self):
        return not self._stopping

    def run_forever(self):
        while self._ready and not self._stopping:
            self._run_once()

    def _run_once(self):
        if not self._ready:
            return
        when, handle = heapq.heappop(self._ready)
        self._clock = when
        handle._run()

    def create_task(self, coro):
        return asyncio.Task(coro, loop=self)

    def run_until_complete(self, future):
        asyncio.futures._chain_future(future, self.create_future())
        self.run_forever()
        if not future.done():
            raise RuntimeError('Event loop stopped before Future completed.')
        return future.result()

    def create_future(self):
        return asyncio.Future(loop=self)

async def low_priority_task():
    print("Low priority task started")
    await asyncio.sleep(2)
    print("Low priority task finished")

async def high_priority_task():
    print("High priority task started")
    await asyncio.sleep(1)
    print("High priority task finished")

async def main():
    loop = asyncio.get_event_loop()
    loop.call_later(0.1, loop.create_task, low_priority_task())
    loop.call_later(0, loop.create_task, high_priority_task())
    await asyncio.sleep(3)

asyncio.run(main())

يستخدم هذا priorityeventloop قائمة انتظار كومة لإدارة المهام بناءً على وقت التنفيذ المجدول. يمكنك تعيين الأولويات من خلال جدولة المهام مع تأخيرات مختلفة.

التعامل مع الإلغاء بأمان هو جانب آخر مهم في العمل مع coroutines. إليك مثال على كيفية تنفيذ المهام القابلة للإلغاء:

import asyncio

async def cancellable_operation():
    try:
        print("Operation started")
        await asyncio.sleep(5)
        print("Operation completed")
    except asyncio.CancelledError:
        print("Operation was cancelled")
        # Perform any necessary cleanup
        raise  # Re-raise the CancelledError

async def main():
    task = asyncio.create_task(cancellable_operation())
    await asyncio.sleep(2)
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("Main: task was cancelled")

asyncio.run(main())

في هذا المثال ، يمسك cancellable_operation بـ cancellederror ، ويؤدي أي تنظيف ضروري ، ثم يعيد إعادة الاستثناء. هذا يسمح بالتعامل الرشيق للإلغاء مع استمرار نشر حالة الإلغاء.

دعنا نستكشف تنفيذ تكرار ASYNC المخصص. هذه مفيدة لإنشاء تسلسلات يمكن تكرارها بشكل غير متزامن:

class AsyncRange:
    def __init__(self, start, stop, step=1):
        self.start = start
        self.stop = stop
        self.step = step

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.start >= self.stop:
            raise StopAsyncIteration
        value = self.start
        self.start  = self.step
        await asyncio.sleep(0.1)  # Simulate some async work
        return value

async def main():
    async for i in AsyncRange(0, 5):
        print(i)

asyncio.run(main())

هذه الفئة غير المتزامنة تنفذ بروتوكول ITERATOR ASYNC ، مما يسمح باستخدامه في ASYNC للحلقات.

أخيرًا ، دعونا نلقي نظرة على تنفيذ مديري سياق ASYNC المخصصين. هذه مفيدة لإدارة الموارد التي تحتاج إلى الحصول عليها وإصدارها بشكل غير متزامن:

class AsyncResource:
    async def __aenter__(self):
        print("Acquiring resource")
        await asyncio.sleep(1)  # Simulate async acquisition
        return self

    async def __aexit__(self, exc_type, exc, tb):
        print("Releasing resource")
        await asyncio.sleep(1)  # Simulate async release

async def main():
    async with AsyncResource() as resource:
        print("Using resource")
        await asyncio.sleep(1)

asyncio.run(main())

هذه فئة Asyncresource تنفذ الأساليب AENTER و AEXIT ، مما يسمح باستخدامه مع ASYNC مع العبارة.

في الختام ، يوفر نظام Python's Coroutine أساسًا قويًا لبناء بدائل غير متزامنة مخصصة. من خلال فهم الآليات والبروتوكولات الأساسية ، يمكنك إنشاء حلول مصممة لتحديات غير متزامنة محددة ، وتحسين الأداء في السيناريوهات المتزامنة المعقدة ، وتوسيع قدرات Python Async. تذكر أنه على الرغم من أن هذه التطبيقات المخصصة رائعة للتعلم وحالات الاستخدام المحددة ، إلا أن مكتبة Asyncio المدمجة في Python محسّنة للغاية ويجب أن تكون الخاصة بك في معظم السيناريوهات. ترميز سعيد!


إبداعاتنا

تأكد من مراجعة إبداعاتنا:

Investor Central | Smart Living | epochs & أصداء | أسرار محير | Hindutva | Elite dev | مدارس JS


نحن على متوسط

Tech Koala Insights | epochs & أصداء العالم | المستثمر المركزي المتوسط ​​ | أسرار محيرة متوسطة | Science & Epochs Medium | Modern Hindutva

بيان الافراج تتم إعادة طباعة هذه المقالة على: https://dev.to/aaravjoshi/master-python-coroutines-create-custom-async-tools-for--sourful-concurrent-apps-1dpc؟1 إذا كان هناك أي انتهاك ، يرجى الاتصال [email protected] بحدها.
أحدث البرنامج التعليمي أكثر>

تنصل: جميع الموارد المقدمة هي جزئيًا من الإنترنت. إذا كان هناك أي انتهاك لحقوق الطبع والنشر الخاصة بك أو الحقوق والمصالح الأخرى، فيرجى توضيح الأسباب التفصيلية وتقديم دليل على حقوق الطبع والنشر أو الحقوق والمصالح ثم إرسالها إلى البريد الإلكتروني: [email protected]. سوف نتعامل مع الأمر لك في أقرب وقت ممكن.

Copyright© 2022 湘ICP备2022001581号-3