En el desarrollo web moderno, a menudo nos ocupamos de eventos, ya sea manejando mensajes WebSocket entrantes, eventos enviados por el servidor (SSE) o flujos de datos de servicios como Redis Pub/Sub. Si bien Node.js proporciona capacidades controladas por eventos, carece de una forma lista para usar de iterar de forma asincrónica sobre eventos usando bucles for await...of.
En esta publicación, lo guiaré a través de una manera simple pero poderosa de crear un iterador de eventos asincrónicos usando TypeScript y AsyncGenerator. Este enfoque está diseñado para permitirle consumir eventos de cualquier tipo de emisor de eventos de una manera limpia y predecible, con control total sobre la lógica de cancelación y limpieza.
En uno de mis proyectos recientes, necesitaba escuchar los canales de Redis Pub/Sub y enviar eventos enviados por el servidor (SSE) de forma asincrónica a los clientes conectados. El desafío era manejar los eventos entrantes sin sobrecargar el sistema y al mismo tiempo permitir al consumidor cancelar la transmisión del evento en cualquier momento.
¿La solución? Un iterador de eventos que convierte cualquier emisor de eventos (como Redis Pub/Sub) en un iterable asincrónico. Esto nos permite procesar eventos de manera controlada y manejar con gracia la cancelación cuando sea necesario.
Profundicemos en la implementación.
export type Context= { emit: (value: T) => void; cancel: () => void; }; export type CleanupFn = () => void | Promise ; export type Subscriber = ( context: Context , ) => void | CleanupFn | Promise ; export async function* createEventIterator ( subscriber: Subscriber , ): AsyncGenerator { const events: T[] = []; let cancelled = false; // Create a promise that resolves whenever a new event is added to the events array let resolveNext: (() => void) | null = null; const emit = (event: T) => { events.push(event); // If we are awaiting for a new event, resolve the promise if (resolveNext) { resolveNext(); resolveNext = null; } }; const cancel = () => { cancelled = true; }; const unsubscribe = await subscriber({ emit, cancel }); try { while (!cancelled) { // If there are events in the queue, yield the next event if (events.length > 0) { yield events.shift()!; } else { // Wait for the next event await new Promise ((resolve) => { resolveNext = resolve; }); } } // Process any remaining events that were emitted before cancellation. while (events.length > 0) { yield events.shift()!; } } finally { await unsubscribe?.(); } }
Esta función acepta una función de suscriptor que puede conectar a cualquier emisor de eventos o sistema pub/sub. El suscriptor proporciona dos métodos esenciales:
La función devuelve un AsyncGenerator
Objeto de contexto:
El tipo Context
Cola de eventos:
La matriz events: T[] sirve como búfer para almacenar eventos emitidos. El generador procesará estos eventos uno por uno. Si no hay eventos en la cola, esperará a que se emita el siguiente evento.
Emitir lógica:
La función de emisión agrega nuevos eventos a la cola y resuelve cualquier promesa pendiente (es decir, si el generador está esperando nuevos eventos).
Cancelación:
Si se llama a la función de cancelación, establece un indicador (cancelado = verdadero) para indicar que el bucle debe salir. Cualquier evento restante en la cola se procesará antes de que se complete el generador.
Limpieza:
Después de la cancelación, el generador invocará la función de cancelación de suscripción (si se proporciona) para realizar cualquier limpieza necesaria. Esto es especialmente importante para cancelar la suscripción a sistemas externos como Redis o limpiar recursos.
Veamos cómo podemos usar este iterador de eventos para escuchar Redis Pub/Sub e iterar asincrónicamente sobre los mensajes entrantes.
import Redis from 'ioredis'; function redisEventIterator(channel: string) { const client = new Redis(); return createEventIterator(({ emit, cancel }) => { const messageHandler = (channel: string, message: string) => { emit(message); }; // Subscribe to the channel client.subscribe(channel); client.on('message', messageHandler); // Cleanup function to unsubscribe and disconnect return async () => { client.off('message', messageHandler); await client.unsubscribe(channel); await client.quit(); }; }); } // Usage (async () => { for await (const message of redisEventIterator('my-channel')) { console.log('New message:', message); // You can cancel the event stream if needed if (message === 'STOP') { break; } } })();
En este ejemplo, utilizamos createEventIterator para suscribirnos a un canal de Redis Pub/Sub e iterar asincrónicamente sobre los mensajes. Cada vez que llega un mensaje nuevo, se emite al generador, donde podemos procesarlo en tiempo real. Si se recibe un mensaje específico (por ejemplo, "DETENER"), rompemos el ciclo y cancelamos la suscripción a Redis.
Así es como puedes usar createEventIterator con EventEmitter de Node.js:
import { EventEmitter } from 'events'; function eventEmitterIterator(emitter: EventEmitter, eventName: string) { return createEventIterator(({ emit, cancel }) => { const eventHandler = (data: string) => emit(data); emitter.on(eventName, eventHandler); // Cleanup function to remove the listener return () => { emitter.off(eventName, eventHandler); }; }); } // Usage (async () => { const emitter = new EventEmitter(); // Simulate event emissions setTimeout(() => emitter.emit('data', 'First event'), 1000); setTimeout(() => emitter.emit('data', 'Second event'), 2000); setTimeout(() => emitter.emit('data', 'STOP'), 3000); for await (const event of eventEmitterIterator(emitter, 'data')) { console.log('Received event:', event); if (event === 'STOP') { break; } } })();
En este ejemplo:
Control asincrónico: al aprovechar AsyncGenerator, podemos manejar eventos de forma asincrónica, procesarlos a nuestro propio ritmo y pausar el procesamiento cuando sea necesario.
Cancelación: la capacidad de cancelar la transmisión del evento en cualquier momento hace que este enfoque sea flexible, especialmente en escenarios del mundo real donde es posible que sea necesario cerrar las conexiones correctamente.
Propósito general: Este iterador se puede usar para cualquier emisor de eventos o sistema Pub/Sub, lo que lo hace versátil para diferentes aplicaciones.
Las arquitecturas basadas en eventos son la piedra angular de muchas aplicaciones web modernas, pero pueden resultar difíciles de administrar cuando necesitamos controlar el flujo de eventos de forma asincrónica. Con el poder de AsyncGenerator en TypeScript, puedes crear soluciones elegantes como este iterador de eventos, haciendo que tu código de manejo de eventos sea más limpio y más fácil de mantener.
Espero que esta publicación te ayude a comenzar con iteradores asíncronos para tus propios emisores de eventos. Si tienes alguna pregunta o idea, ¡no dudes en compartirla en los comentarios!
Descargo de responsabilidad: Todos los recursos proporcionados provienen en parte de Internet. Si existe alguna infracción de sus derechos de autor u otros derechos e intereses, explique los motivos detallados y proporcione pruebas de los derechos de autor o derechos e intereses y luego envíelos al correo electrónico: [email protected]. Lo manejaremos por usted lo antes posible.
Copyright© 2022 湘ICP备2022001581号-3