«Если рабочий хочет хорошо выполнять свою работу, он должен сначала заточить свои инструменты» — Конфуций, «Аналитики Конфуция. Лу Лингун»
титульная страница > программирование > Асинхронная перебор эмиттеров событий в TypeScript с помощью асинхронных генераторов

Асинхронная перебор эмиттеров событий в TypeScript с помощью асинхронных генераторов

Опубликовано 8 ноября 2024 г.
Просматривать:930

Asynchronously Iterating Over Event Emitters in TypeScript with Async Generators

Введение

В современной веб-разработке мы часто имеем дело с событиями, будь то обработка входящих сообщений WebSocket, события, отправленные сервером (SSE) или потоки данных от таких служб, как Redis Pub/Sub. Хотя Node.js предоставляет возможности управления событиями, в нем отсутствует готовый способ асинхронного перебора событий с использованием циклов await...of.

В этом посте я покажу вам простой, но мощный способ создания асинхронного итератора событий с использованием TypeScript и AsyncGenerator. Этот подход разработан, чтобы позволить вам получать события от любого источника событий чистым и предсказуемым способом с полным контролем над логикой отмены и очистки.

Вариант использования: Redis Pub/Sub

В одном из моих недавних проектов мне нужно было прослушивать каналы Redis Pub/Sub и асинхронно отправлять события, отправленные сервером (SSE), подключенным клиентам. Задача заключалась в том, чтобы обрабатывать входящие события, не перегружая систему, и в то же время позволяя потребителю отменить поток событий в любое время.

Решение? Итератор событий, который преобразует любой источник событий (например, Redis Pub/Sub) в асинхронный итерируемый объект. Это позволяет нам контролировать события и корректно обрабатывать их отмену, когда это необходимо.

Давайте углубимся в реализацию.

Кодекс

export type Context = {
    emit: (value: T) => void;
    cancel: () => void;
};

export type CleanupFn = () => void | Promise;

export type Subscriber = (
    context: Context,
) => void | CleanupFn | Promise;

export async function* createEventIterator(
    subscriber: Subscriber,
): AsyncGenerator {
    const events: T[] = [];
    let cancelled = false;

    // Create a promise that resolves whenever a new event is added to the events array
    let resolveNext: (() => void) | null = null;

    const emit = (event: T) => {
        events.push(event);
        // If we are awaiting for a new event, resolve the promise
        if (resolveNext) {
            resolveNext();
            resolveNext = null;
        }
    };

    const cancel = () => {
        cancelled = true;
    };

    const unsubscribe = await subscriber({ emit, cancel });

    try {
        while (!cancelled) {
            // If there are events in the queue, yield the next event
            if (events.length > 0) {
                yield events.shift()!;
            } else {
                // Wait for the next event
                await new Promise((resolve) => {
                    resolveNext = resolve;
                });
            }
        }

        // Process any remaining events that were emitted before cancellation.
        while (events.length > 0) {
            yield events.shift()!;
        }
    } finally {
        await unsubscribe?.();
    }
}

Как это работает

Эта функция принимает функцию подписчика, которую вы можете подключить к любому источнику событий или системе публикации/подписки. Подписчик предоставляет два основных метода:

  1. emit: позволяет подписчику отправлять новые события в итератор.
  2. отмена: предоставляет способ сигнализировать о необходимости остановки итерации.

Функция возвращает AsyncGenerator, позволяя перебирать события с помощью цикла for await...of.

Разбираем код

  1. Объект контекста:
    Тип Context предоставляет интерфейс для создания новых событий или отмены подписки. Подписчик использует этот контекст для управления потоком событий.

  2. Очередь событий:
    Массив event: T[] служит буфером для хранения исходящих событий. Генератор будет обрабатывать эти события одно за другим. Если в очереди нет событий, она будет ждать отправки следующего события.

  3. Выдать логику:
    Функция Emit добавляет новые события в очередь и разрешает любые ожидающие обещания (т. е., если генератор ожидает новых событий).

  4. Отмена:
    Если вызывается функция отмены, она устанавливает флаг (cancelled = true), сигнализирующий о необходимости выхода из цикла. Любые оставшиеся события в очереди будут обработаны до завершения работы генератора.

  5. Очистка:
    После отмены генератор вызовет функцию отказа от подписки (если она предусмотрена) для выполнения необходимой очистки. Это особенно важно для отказа от подписки на внешние системы, такие как Redis, или очистки ресурсов.

Пример: прослушивание Redis Pub/Sub

Давайте посмотрим, как мы можем использовать этот итератор событий для прослушивания Redis Pub/Sub и асинхронного перебора входящих сообщений.

import Redis from 'ioredis';

function redisEventIterator(channel: string) {
    const client = new Redis();

    return createEventIterator(({ emit, cancel }) => {
        const messageHandler = (channel: string, message: string) => {
            emit(message);
        };

        // Subscribe to the channel
        client.subscribe(channel);
        client.on('message', messageHandler);

        // Cleanup function to unsubscribe and disconnect
        return async () => {
            client.off('message', messageHandler);
            await client.unsubscribe(channel);
            await client.quit();
        };
    });
}

// Usage
(async () => {
    for await (const message of redisEventIterator('my-channel')) {
        console.log('New message:', message);

        // You can cancel the event stream if needed
        if (message === 'STOP') {
            break;
        }
    }
})();

В этом примере мы используем createEventIterator для подписки на канал Redis Pub/Sub и асинхронного перебора сообщений. Каждый раз, когда приходит новое сообщение, оно передается в генератор, где мы можем обработать его в режиме реального времени. Если получено определенное сообщение (например, «СТОП»), мы прерываем цикл и отписываемся от Redis.

Пример: использование EventEmitter

Вот как вы можете использовать createEventIterator с EventEmitter Node.js:

import { EventEmitter } from 'events';

function eventEmitterIterator(emitter: EventEmitter, eventName: string) {
    return createEventIterator(({ emit, cancel }) => {
        const eventHandler = (data: string) => emit(data);

        emitter.on(eventName, eventHandler);

        // Cleanup function to remove the listener
        return () => {
            emitter.off(eventName, eventHandler);
        };
    });
}

// Usage
(async () => {
    const emitter = new EventEmitter();

    // Simulate event emissions
    setTimeout(() => emitter.emit('data', 'First event'), 1000);
    setTimeout(() => emitter.emit('data', 'Second event'), 2000);
    setTimeout(() => emitter.emit('data', 'STOP'), 3000);

    for await (const event of eventEmitterIterator(emitter, 'data')) {
        console.log('Received event:', event);

        if (event === 'STOP') {
            break;
        }
    }
})();

В этом примере:

  • Мы используем EventEmitter для генерации событий, которые фиксируются createEventIterator.
  • Итератор прослушивает событие data и обрабатывает его асинхронно.
  • Как и в примере с Redis, мы можем остановить итерацию при получении определенного события («STOP»).

Преимущества этого подхода

  • Асинхронное управление: используя AsyncGenerator, мы можем обрабатывать события асинхронно, обрабатывать их в удобном для нас темпе и при необходимости приостанавливать обработку.

  • Отмена: возможность отмены потока событий в любое время делает этот подход гибким, особенно в реальных сценариях, где соединения может потребоваться корректно закрыть.

  • Общего назначения: этот итератор можно использовать для любого источника событий или системы публикации/подписки, что делает его универсальным для различных приложений.

Заключение

Событийно-ориентированная архитектура является краеугольным камнем многих современных веб-приложений, но ею может быть сложно управлять, когда нам нужно асинхронно контролировать поток событий. Благодаря возможностям AsyncGenerator в TypeScript вы можете создавать элегантные решения, такие как этот итератор событий, делая ваш код обработки событий более чистым и простым в обслуживании.

Надеюсь, этот пост поможет вам начать работу с асинхронными итераторами для ваших собственных генераторов событий. Если у вас есть какие-либо вопросы или мысли, поделитесь ими в комментариях!

Заявление о выпуске Эта статья воспроизведена по адресу: https://dev.to/redjohnsh/asynchronous-iterating-over-event-emitters-in-typescript-with-async-generators-3mk?1 Если есть какие-либо нарушения, пожалуйста, свяжитесь с Study_golang@163 .com, чтобы удалить его
Последний учебник Более>

Изучайте китайский

Отказ от ответственности: Все предоставленные ресурсы частично взяты из Интернета. В случае нарушения ваших авторских прав или других прав и интересов, пожалуйста, объясните подробные причины и предоставьте доказательства авторских прав или прав и интересов, а затем отправьте их по электронной почте: [email protected]. Мы сделаем это за вас как можно скорее.

Copyright© 2022 湘ICP备2022001581号-3