In modern web development, we often deal with events, whether it's handling incoming WebSocket messages, server-sent events (SSE), or data streams from services like Redis Pub/Sub. While Node.js provides event-driven capabilities, it lacks an out-of-the-box way to asynchronously iterate over events using for await...of loops.
In this post, I'll walk you through a simple yet powerful way to create an asynchronous event iterator using TypeScript and AsyncGenerator. This approach is designed to allow you to consume events from any kind of event emitter in a clean and predictable way, with full control over cancellation and cleanup logic.
In one of my recent projects, I needed to listen to Redis Pub/Sub channels and dispatch server-sent events (SSE) asynchronously to connected clients. The challenge was handling incoming events without overwhelming the system while allowing the consumer to cancel the event stream at any time.
The solution? An event iterator that converts any event emitter (such as Redis Pub/Sub) into an asynchronous iterable. This allows us to process events in a controlled manner and gracefully handle cancellation when necessary.
Let’s dive into the implementation.
export type Context= { emit: (value: T) => void; cancel: () => void; }; export type CleanupFn = () => void | Promise ; export type Subscriber = ( context: Context , ) => void | CleanupFn | Promise ; export async function* createEventIterator ( subscriber: Subscriber , ): AsyncGenerator { const events: T[] = []; let cancelled = false; // Create a promise that resolves whenever a new event is added to the events array let resolveNext: (() => void) | null = null; const emit = (event: T) => { events.push(event); // If we are awaiting for a new event, resolve the promise if (resolveNext) { resolveNext(); resolveNext = null; } }; const cancel = () => { cancelled = true; }; const unsubscribe = await subscriber({ emit, cancel }); try { while (!cancelled) { // If there are events in the queue, yield the next event if (events.length > 0) { yield events.shift()!; } else { // Wait for the next event await new Promise ((resolve) => { resolveNext = resolve; }); } } // Process any remaining events that were emitted before cancellation. while (events.length > 0) { yield events.shift()!; } } finally { await unsubscribe?.(); } }
This function accepts a subscriber function that you can hook into any event emitter or pub/sub system. The subscriber provides two essential methods:
The function returns an AsyncGenerator
Context Object:
The Context
Event Queue:
The events: T[] array serves as a buffer to store emitted events. The generator will process these events one by one. If there are no events in the queue, it will wait for the next event to be emitted.
Emit Logic:
The emit function adds new events to the queue and resolves any pending promise (i.e., if the generator is waiting for new events).
Cancellation:
If the cancel function is called, it sets a flag (cancelled = true) to signal that the loop should exit. Any remaining events in the queue will still be processed before the generator completes.
Cleanup:
After cancellation, the generator will invoke the unsubscribe function (if provided) to perform any necessary cleanup. This is especially important for unsubscribing from external systems like Redis or cleaning up resources.
Let’s see how we can use this event iterator to listen to Redis Pub/Sub and asynchronously iterate over the incoming messages.
import Redis from 'ioredis'; function redisEventIterator(channel: string) { const client = new Redis(); return createEventIterator(({ emit, cancel }) => { const messageHandler = (channel: string, message: string) => { emit(message); }; // Subscribe to the channel client.subscribe(channel); client.on('message', messageHandler); // Cleanup function to unsubscribe and disconnect return async () => { client.off('message', messageHandler); await client.unsubscribe(channel); await client.quit(); }; }); } // Usage (async () => { for await (const message of redisEventIterator('my-channel')) { console.log('New message:', message); // You can cancel the event stream if needed if (message === 'STOP') { break; } } })();
In this example, we use createEventIterator to subscribe to a Redis Pub/Sub channel and asynchronously iterate over the messages. Each time a new message arrives, it is emitted into the generator, where we can process it in real-time. If a specific message (e.g., "STOP") is received, we break the loop and unsubscribe from Redis.
Here's how you can use createEventIterator with Node.js's EventEmitter:
import { EventEmitter } from 'events'; function eventEmitterIterator(emitter: EventEmitter, eventName: string) { return createEventIterator(({ emit, cancel }) => { const eventHandler = (data: string) => emit(data); emitter.on(eventName, eventHandler); // Cleanup function to remove the listener return () => { emitter.off(eventName, eventHandler); }; }); } // Usage (async () => { const emitter = new EventEmitter(); // Simulate event emissions setTimeout(() => emitter.emit('data', 'First event'), 1000); setTimeout(() => emitter.emit('data', 'Second event'), 2000); setTimeout(() => emitter.emit('data', 'STOP'), 3000); for await (const event of eventEmitterIterator(emitter, 'data')) { console.log('Received event:', event); if (event === 'STOP') { break; } } })();
In this example:
Asynchronous Control: By leveraging the AsyncGenerator, we can handle events asynchronously, process them at our own pace, and pause processing when needed.
Cancellation: The ability to cancel the event stream at any time makes this approach flexible, especially in real-world scenarios where connections may need to be closed gracefully.
General-Purpose: This iterator can be used for any event emitter or Pub/Sub system, making it versatile for different applications.
Event-driven architectures are a cornerstone of many modern web applications, but they can become tricky to manage when we need to control the flow of events asynchronously. With the power of AsyncGenerator in TypeScript, you can build elegant solutions like this event iterator, making your event-handling code cleaner and easier to maintain.
I hope this post helps you get started with async iterators for your own event emitters. If you have any questions or thoughts, feel free to share them in the comments!
Disclaimer: All resources provided are partly from the Internet. If there is any infringement of your copyright or other rights and interests, please explain the detailed reasons and provide proof of copyright or rights and interests and then send it to the email: [email protected] We will handle it for you as soon as possible.
Copyright© 2022 湘ICP备2022001581号-3