В современной веб-разработке мы часто имеем дело с событиями, будь то обработка входящих сообщений WebSocket, события, отправленные сервером (SSE) или потоки данных от таких служб, как Redis Pub/Sub. Хотя Node.js предоставляет возможности управления событиями, в нем отсутствует готовый способ асинхронного перебора событий с использованием циклов await...of.
В этом посте я покажу вам простой, но мощный способ создания асинхронного итератора событий с использованием TypeScript и AsyncGenerator. Этот подход разработан, чтобы позволить вам получать события от любого источника событий чистым и предсказуемым способом с полным контролем над логикой отмены и очистки.
В одном из моих недавних проектов мне нужно было прослушивать каналы Redis Pub/Sub и асинхронно отправлять события, отправленные сервером (SSE), подключенным клиентам. Задача заключалась в том, чтобы обрабатывать входящие события, не перегружая систему, и в то же время позволяя потребителю отменить поток событий в любое время.
Решение? Итератор событий, который преобразует любой источник событий (например, Redis Pub/Sub) в асинхронный итерируемый объект. Это позволяет нам контролировать события и корректно обрабатывать их отмену, когда это необходимо.
Давайте углубимся в реализацию.
export type Context= { emit: (value: T) => void; cancel: () => void; }; export type CleanupFn = () => void | Promise ; export type Subscriber = ( context: Context , ) => void | CleanupFn | Promise ; export async function* createEventIterator ( subscriber: Subscriber , ): AsyncGenerator { const events: T[] = []; let cancelled = false; // Create a promise that resolves whenever a new event is added to the events array let resolveNext: (() => void) | null = null; const emit = (event: T) => { events.push(event); // If we are awaiting for a new event, resolve the promise if (resolveNext) { resolveNext(); resolveNext = null; } }; const cancel = () => { cancelled = true; }; const unsubscribe = await subscriber({ emit, cancel }); try { while (!cancelled) { // If there are events in the queue, yield the next event if (events.length > 0) { yield events.shift()!; } else { // Wait for the next event await new Promise ((resolve) => { resolveNext = resolve; }); } } // Process any remaining events that were emitted before cancellation. while (events.length > 0) { yield events.shift()!; } } finally { await unsubscribe?.(); } }
Эта функция принимает функцию подписчика, которую вы можете подключить к любому источнику событий или системе публикации/подписки. Подписчик предоставляет два основных метода:
Функция возвращает AsyncGenerator
Объект контекста:
Тип Context
Очередь событий:
Массив event: T[] служит буфером для хранения исходящих событий. Генератор будет обрабатывать эти события одно за другим. Если в очереди нет событий, она будет ждать отправки следующего события.
Выдать логику:
Функция Emit добавляет новые события в очередь и разрешает любые ожидающие обещания (т. е., если генератор ожидает новых событий).
Отмена:
Если вызывается функция отмены, она устанавливает флаг (cancelled = true), сигнализирующий о необходимости выхода из цикла. Любые оставшиеся события в очереди будут обработаны до завершения работы генератора.
Очистка:
После отмены генератор вызовет функцию отказа от подписки (если она предусмотрена) для выполнения необходимой очистки. Это особенно важно для отказа от подписки на внешние системы, такие как Redis, или очистки ресурсов.
Давайте посмотрим, как мы можем использовать этот итератор событий для прослушивания Redis Pub/Sub и асинхронного перебора входящих сообщений.
import Redis from 'ioredis'; function redisEventIterator(channel: string) { const client = new Redis(); return createEventIterator(({ emit, cancel }) => { const messageHandler = (channel: string, message: string) => { emit(message); }; // Subscribe to the channel client.subscribe(channel); client.on('message', messageHandler); // Cleanup function to unsubscribe and disconnect return async () => { client.off('message', messageHandler); await client.unsubscribe(channel); await client.quit(); }; }); } // Usage (async () => { for await (const message of redisEventIterator('my-channel')) { console.log('New message:', message); // You can cancel the event stream if needed if (message === 'STOP') { break; } } })();
В этом примере мы используем createEventIterator для подписки на канал Redis Pub/Sub и асинхронного перебора сообщений. Каждый раз, когда приходит новое сообщение, оно передается в генератор, где мы можем обработать его в режиме реального времени. Если получено определенное сообщение (например, «СТОП»), мы прерываем цикл и отписываемся от Redis.
Вот как вы можете использовать createEventIterator с EventEmitter Node.js:
import { EventEmitter } from 'events'; function eventEmitterIterator(emitter: EventEmitter, eventName: string) { return createEventIterator(({ emit, cancel }) => { const eventHandler = (data: string) => emit(data); emitter.on(eventName, eventHandler); // Cleanup function to remove the listener return () => { emitter.off(eventName, eventHandler); }; }); } // Usage (async () => { const emitter = new EventEmitter(); // Simulate event emissions setTimeout(() => emitter.emit('data', 'First event'), 1000); setTimeout(() => emitter.emit('data', 'Second event'), 2000); setTimeout(() => emitter.emit('data', 'STOP'), 3000); for await (const event of eventEmitterIterator(emitter, 'data')) { console.log('Received event:', event); if (event === 'STOP') { break; } } })();
В этом примере:
Асинхронное управление: используя AsyncGenerator, мы можем обрабатывать события асинхронно, обрабатывать их в удобном для нас темпе и при необходимости приостанавливать обработку.
Отмена: возможность отмены потока событий в любое время делает этот подход гибким, особенно в реальных сценариях, где соединения может потребоваться корректно закрыть.
Общего назначения: этот итератор можно использовать для любого источника событий или системы публикации/подписки, что делает его универсальным для различных приложений.
Событийно-ориентированная архитектура является краеугольным камнем многих современных веб-приложений, но ею может быть сложно управлять, когда нам нужно асинхронно контролировать поток событий. Благодаря возможностям AsyncGenerator в TypeScript вы можете создавать элегантные решения, такие как этот итератор событий, делая ваш код обработки событий более чистым и простым в обслуживании.
Надеюсь, этот пост поможет вам начать работу с асинхронными итераторами для ваших собственных генераторов событий. Если у вас есть какие-либо вопросы или мысли, поделитесь ими в комментариях!
Отказ от ответственности: Все предоставленные ресурсы частично взяты из Интернета. В случае нарушения ваших авторских прав или других прав и интересов, пожалуйста, объясните подробные причины и предоставьте доказательства авторских прав или прав и интересов, а затем отправьте их по электронной почте: [email protected]. Мы сделаем это за вас как можно скорее.
Copyright© 2022 湘ICP备2022001581号-3