"إذا أراد العامل أن يؤدي عمله بشكل جيد، فعليه أولاً أن يشحذ أدواته." - كونفوشيوس، "مختارات كونفوشيوس. لو لينجونج"
الصفحة الأمامية > برمجة > التكرار بشكل غير متزامن على بواعث الأحداث في TypeScript باستخدام المولدات غير المتزامنة

التكرار بشكل غير متزامن على بواعث الأحداث في TypeScript باستخدام المولدات غير المتزامنة

تم النشر بتاريخ 2024-11-08
تصفح:757

Asynchronously Iterating Over Event Emitters in TypeScript with Async Generators

مقدمة

في تطوير الويب الحديث، غالبًا ما نتعامل مع الأحداث، سواء كان ذلك التعامل مع رسائل WebSocket الواردة، أو الأحداث المرسلة من الخادم (SSE)، أو تدفقات البيانات من خدمات مثل Redis Pub/Sub. على الرغم من أن Node.js يوفر إمكانات تعتمد على الأحداث، إلا أنه يفتقر إلى طريقة مبتكرة للتكرار بشكل غير متزامن على الأحداث باستخدام حلقات الانتظار.

في هذا المنشور، سأوجهك عبر طريقة بسيطة لكنها فعالة لإنشاء مكرر حدث غير متزامن باستخدام TypeScript وAsyncGenerator. تم تصميم هذا الأسلوب للسماح لك باستهلاك الأحداث من أي نوع من باعث الأحداث بطريقة نظيفة ويمكن التنبؤ بها، مع التحكم الكامل في منطق الإلغاء والتنظيف.

حالة الاستخدام: Redis Pub/Sub

في أحد مشاريعي الأخيرة، كنت بحاجة إلى الاستماع إلى قنوات Redis Pub/Sub وإرسال الأحداث المرسلة من الخادم (SSE) بشكل غير متزامن إلى العملاء المتصلين. كان التحدي يتمثل في التعامل مع الأحداث الواردة دون إرباك النظام مع السماح للمستهلك بإلغاء بث الحدث في أي وقت.

الحل؟ مكرر الحدث الذي يحول أي باعث حدث (مثل Redis Pub/Sub) إلى كائن قابل للتكرار غير متزامن. وهذا يسمح لنا بمعالجة الأحداث بطريقة خاضعة للرقابة والتعامل مع الإلغاء بأمان عند الضرورة.

دعونا نتعمق في التنفيذ.

الكود

export type Context = {
    emit: (value: T) => void;
    cancel: () => void;
};

export type CleanupFn = () => void | Promise;

export type Subscriber = (
    context: Context,
) => void | CleanupFn | Promise;

export async function* createEventIterator(
    subscriber: Subscriber,
): AsyncGenerator {
    const events: T[] = [];
    let cancelled = false;

    // Create a promise that resolves whenever a new event is added to the events array
    let resolveNext: (() => void) | null = null;

    const emit = (event: T) => {
        events.push(event);
        // If we are awaiting for a new event, resolve the promise
        if (resolveNext) {
            resolveNext();
            resolveNext = null;
        }
    };

    const cancel = () => {
        cancelled = true;
    };

    const unsubscribe = await subscriber({ emit, cancel });

    try {
        while (!cancelled) {
            // If there are events in the queue, yield the next event
            if (events.length > 0) {
                yield events.shift()!;
            } else {
                // Wait for the next event
                await new Promise((resolve) => {
                    resolveNext = resolve;
                });
            }
        }

        // Process any remaining events that were emitted before cancellation.
        while (events.length > 0) {
            yield events.shift()!;
        }
    } finally {
        await unsubscribe?.();
    }
}

كيف يعمل

تقبل هذه الوظيفة وظيفة المشترك التي يمكنك ربطها بأي باعث حدث أو نظام حانة/فرعي. يوفر المشترك طريقتين أساسيتين:

  1. emit: يسمح للمشترك بدفع أحداث جديدة إلى المكرر.
  2. إلغاء: يوفر طريقة للإشارة إلى أن التكرار يجب أن يتوقف.

تقوم الدالة بإرجاع AsyncGenerator، مما يسمح لك بالتكرار على الأحداث باستخدام حلقة انتظار... of.

كسر الكود

  1. كائن السياق :
    يوفر نوع السياق واجهة لإصدار أحداث جديدة أو إلغاء الاشتراك. يستخدم المشترك هذا السياق للتحكم في تدفق الأحداث.

  2. قائمة انتظار الأحداث:
    الأحداث: تعمل صفيف T[] كمخزن مؤقت لتخزين الأحداث المنبعثة. سيقوم المولد بمعالجة هذه الأحداث واحدًا تلو الآخر. إذا لم تكن هناك أحداث في قائمة الانتظار، فسوف تنتظر حتى يتم إصدار الحدث التالي.

  3. منطق الانبعاث:
    تضيف وظيفة الإرسال أحداثًا جديدة إلى قائمة الانتظار وتحل أي وعد معلق (على سبيل المثال، إذا كان المولد ينتظر أحداثًا جديدة).

  4. إلغاء:
    إذا تم استدعاء وظيفة الإلغاء، فإنها تقوم بتعيين علامة (تم الإلغاء = صحيح) للإشارة إلى ضرورة خروج الحلقة. ستتم معالجة أي أحداث متبقية في قائمة الانتظار قبل اكتمال عملية المولد.

  5. تنظيف:
    بعد الإلغاء، سيقوم المولد باستدعاء وظيفة إلغاء الاشتراك (إذا كانت متوفرة) لإجراء أي عملية تنظيف ضرورية. وهذا مهم بشكل خاص لإلغاء الاشتراك في الأنظمة الخارجية مثل Redis أو تنظيف الموارد.

مثال: الاستماع إلى Redis Pub/Sub

دعونا نرى كيف يمكننا استخدام مكرر الحدث هذا للاستماع إلى Redis Pub/Sub والتكرار بشكل غير متزامن على الرسائل الواردة.

import Redis from 'ioredis';

function redisEventIterator(channel: string) {
    const client = new Redis();

    return createEventIterator(({ emit, cancel }) => {
        const messageHandler = (channel: string, message: string) => {
            emit(message);
        };

        // Subscribe to the channel
        client.subscribe(channel);
        client.on('message', messageHandler);

        // Cleanup function to unsubscribe and disconnect
        return async () => {
            client.off('message', messageHandler);
            await client.unsubscribe(channel);
            await client.quit();
        };
    });
}

// Usage
(async () => {
    for await (const message of redisEventIterator('my-channel')) {
        console.log('New message:', message);

        // You can cancel the event stream if needed
        if (message === 'STOP') {
            break;
        }
    }
})();

في هذا المثال، نستخدم createEventIterator للاشتراك في قناة Redis Pub/Sub والتكرار بشكل غير متزامن على الرسائل. في كل مرة تصل رسالة جديدة، يتم إرسالها إلى المولد، حيث يمكننا معالجتها في الوقت الفعلي. إذا تم تلقي رسالة محددة (على سبيل المثال، "STOP")، فإننا نقطع الحلقة ونلغي الاشتراك في Redis.

مثال: استخدام EventEmitter

إليك كيفية استخدام createEventIterator مع EventEmitter الخاص بـ Node.js:

import { EventEmitter } from 'events';

function eventEmitterIterator(emitter: EventEmitter, eventName: string) {
    return createEventIterator(({ emit, cancel }) => {
        const eventHandler = (data: string) => emit(data);

        emitter.on(eventName, eventHandler);

        // Cleanup function to remove the listener
        return () => {
            emitter.off(eventName, eventHandler);
        };
    });
}

// Usage
(async () => {
    const emitter = new EventEmitter();

    // Simulate event emissions
    setTimeout(() => emitter.emit('data', 'First event'), 1000);
    setTimeout(() => emitter.emit('data', 'Second event'), 2000);
    setTimeout(() => emitter.emit('data', 'STOP'), 3000);

    for await (const event of eventEmitterIterator(emitter, 'data')) {
        console.log('Received event:', event);

        if (event === 'STOP') {
            break;
        }
    }
})();

في هذا المثال:

  • نستخدم EventEmitter لإصدار الأحداث، والتي يتم التقاطها بواسطة createEventIterator.
  • يستمع المكرِّر إلى حدث "البيانات" ويعالجه بشكل غير متزامن.
  • على غرار مثال Redis، يمكننا إيقاف التكرار عند تلقي حدث معين ('STOP').

فوائد هذا النهج

  • التحكم غير المتزامن : من خلال الاستفادة من AsyncGenerator، يمكننا التعامل مع الأحداث بشكل غير متزامن، ومعالجتها بالسرعة التي تناسبنا، وإيقاف المعالجة مؤقتًا عند الحاجة.

  • الإلغاء: القدرة على إلغاء تدفق الحدث في أي وقت تجعل هذا النهج مرنًا، خاصة في سيناريوهات العالم الحقيقي حيث قد يلزم إغلاق الاتصالات بأمان.

  • للأغراض العامة : يمكن استخدام هذا المكرر لأي باعث حدث أو نظام Pub/Sub، مما يجعله متعدد الاستخدامات لتطبيقات مختلفة.

خاتمة

تعتبر البنى المبنية على الأحداث حجر الزاوية في العديد من تطبيقات الويب الحديثة، ولكن يمكن أن تصبح إدارتها صعبة عندما نحتاج إلى التحكم في تدفق الأحداث بشكل غير متزامن. بفضل قوة AsyncGenerator في TypeScript، يمكنك إنشاء حلول أنيقة مثل مُكرِّر الأحداث هذا، مما يجعل كود التعامل مع الأحداث الخاص بك أكثر وضوحًا وأسهل في الصيانة.

آمل أن يساعدك هذا المنشور في البدء باستخدام التكرارات غير المتزامنة لبواعث الأحداث الخاصة بك. إذا كان لديك أي أسئلة أو أفكار، فلا تتردد في مشاركتها في التعليقات!

بيان الافراج تم إعادة نشر هذه المقالة على: https://dev.to/redjohnsh/asynchronously-iterated-over-event-emitters-in-typescript-with-async-generators-3mk?1 إذا كان هناك أي انتهاك، يرجى الاتصال بـ Study_golang@163 .com لحذفه
أحدث البرنامج التعليمي أكثر>

تنصل: جميع الموارد المقدمة هي جزئيًا من الإنترنت. إذا كان هناك أي انتهاك لحقوق الطبع والنشر الخاصة بك أو الحقوق والمصالح الأخرى، فيرجى توضيح الأسباب التفصيلية وتقديم دليل على حقوق الطبع والنشر أو الحقوق والمصالح ثم إرسالها إلى البريد الإلكتروني: [email protected]. سوف نتعامل مع الأمر لك في أقرب وقت ممكن.

Copyright© 2022 湘ICP备2022001581号-3