No desenvolvimento web moderno, muitas vezes lidamos com eventos, seja lidando com mensagens WebSocket recebidas, eventos enviados pelo servidor (SSE) ou fluxos de dados de serviços como Redis Pub/Sub. Embora o Node.js forneça recursos orientados a eventos, ele não possui uma maneira pronta para uso de iterar eventos de forma assíncrona usando for await...of loops.
Nesta postagem, mostrarei uma maneira simples, mas poderosa, de criar um iterador de evento assíncrono usando TypeScript e AsyncGenerator. Essa abordagem foi projetada para permitir que você consuma eventos de qualquer tipo de emissor de eventos de maneira limpa e previsível, com controle total sobre a lógica de cancelamento e limpeza.
Em um de meus projetos recentes, precisei ouvir os canais Redis Pub/Sub e despachar eventos enviados pelo servidor (SSE) de forma assíncrona para clientes conectados. O desafio era lidar com os eventos recebidos sem sobrecarregar o sistema e, ao mesmo tempo, permitir que o consumidor cancelasse a transmissão do evento a qualquer momento.
A solução? Um iterador de evento que converte qualquer emissor de evento (como Redis Pub/Sub) em um iterável assíncrono. Isso nos permite processar eventos de maneira controlada e lidar com cancelamentos quando necessário.
Vamos mergulhar na implementação.
export type Context= { emit: (value: T) => void; cancel: () => void; }; export type CleanupFn = () => void | Promise ; export type Subscriber = ( context: Context , ) => void | CleanupFn | Promise ; export async function* createEventIterator ( subscriber: Subscriber , ): AsyncGenerator { const events: T[] = []; let cancelled = false; // Create a promise that resolves whenever a new event is added to the events array let resolveNext: (() => void) | null = null; const emit = (event: T) => { events.push(event); // If we are awaiting for a new event, resolve the promise if (resolveNext) { resolveNext(); resolveNext = null; } }; const cancel = () => { cancelled = true; }; const unsubscribe = await subscriber({ emit, cancel }); try { while (!cancelled) { // If there are events in the queue, yield the next event if (events.length > 0) { yield events.shift()!; } else { // Wait for the next event await new Promise ((resolve) => { resolveNext = resolve; }); } } // Process any remaining events that were emitted before cancellation. while (events.length > 0) { yield events.shift()!; } } finally { await unsubscribe?.(); } }
Esta função aceita uma função de assinante que você pode conectar a qualquer emissor de evento ou sistema pub/sub. O assinante fornece dois métodos essenciais:
A função retorna um AsyncGenerator
Objeto de contexto:
O tipo Context
Fila de eventos:
O array events: T[] serve como um buffer para armazenar eventos emitidos. O gerador processará esses eventos um por um. Se não houver eventos na fila, ele aguardará a emissão do próximo evento.
Emitir Lógica:
A função emit adiciona novos eventos à fila e resolve qualquer promessa pendente (ou seja, se o gerador estiver aguardando novos eventos).
Cancelamento:
Se a função cancel for chamada, ela define um sinalizador (cancelled = true) para sinalizar que o loop deve sair. Quaisquer eventos restantes na fila ainda serão processados antes da conclusão do gerador.
Limpar:
Após o cancelamento, o gerador invocará a função de cancelamento de assinatura (se fornecida) para realizar qualquer limpeza necessária. Isso é especialmente importante para cancelar a assinatura de sistemas externos como Redis ou limpar recursos.
Vamos ver como podemos usar esse iterador de evento para ouvir o Redis Pub/Sub e iterar de forma assíncrona nas mensagens recebidas.
import Redis from 'ioredis'; function redisEventIterator(channel: string) { const client = new Redis(); return createEventIterator(({ emit, cancel }) => { const messageHandler = (channel: string, message: string) => { emit(message); }; // Subscribe to the channel client.subscribe(channel); client.on('message', messageHandler); // Cleanup function to unsubscribe and disconnect return async () => { client.off('message', messageHandler); await client.unsubscribe(channel); await client.quit(); }; }); } // Usage (async () => { for await (const message of redisEventIterator('my-channel')) { console.log('New message:', message); // You can cancel the event stream if needed if (message === 'STOP') { break; } } })();
Neste exemplo, usamos createEventIterator para assinar um canal Redis Pub/Sub e iterar de forma assíncrona nas mensagens. Cada vez que chega uma nova mensagem, ela é emitida para o gerador, onde podemos processá-la em tempo real. Se uma mensagem específica (por exemplo, "STOP") for recebida, interrompemos o ciclo e cancelamos a assinatura do Redis.
Veja como você pode usar createEventIterator com EventEmitter do Node.js:
import { EventEmitter } from 'events'; function eventEmitterIterator(emitter: EventEmitter, eventName: string) { return createEventIterator(({ emit, cancel }) => { const eventHandler = (data: string) => emit(data); emitter.on(eventName, eventHandler); // Cleanup function to remove the listener return () => { emitter.off(eventName, eventHandler); }; }); } // Usage (async () => { const emitter = new EventEmitter(); // Simulate event emissions setTimeout(() => emitter.emit('data', 'First event'), 1000); setTimeout(() => emitter.emit('data', 'Second event'), 2000); setTimeout(() => emitter.emit('data', 'STOP'), 3000); for await (const event of eventEmitterIterator(emitter, 'data')) { console.log('Received event:', event); if (event === 'STOP') { break; } } })();
Neste exemplo:
Controle assíncrono: Ao aproveitar o AsyncGenerator, podemos lidar com eventos de forma assíncrona, processá-los em nosso próprio ritmo e pausar o processamento quando necessário.
Cancelamento: a capacidade de cancelar o stream do evento a qualquer momento torna essa abordagem flexível, especialmente em cenários do mundo real onde as conexões podem precisar ser fechadas normalmente.
Uso geral: este iterador pode ser usado para qualquer emissor de evento ou sistema Pub/Sub, tornando-o versátil para diferentes aplicações.
As arquiteturas orientadas a eventos são a base de muitos aplicativos da Web modernos, mas podem se tornar difíceis de gerenciar quando precisamos controlar o fluxo de eventos de forma assíncrona. Com o poder do AsyncGenerator no TypeScript, você pode construir soluções elegantes como este iterador de eventos, tornando seu código de manipulação de eventos mais limpo e fácil de manter.
Espero que esta postagem ajude você a começar com iteradores assíncronos para seus próprios emissores de eventos. Se você tiver alguma dúvida ou opinião, sinta-se à vontade para compartilhá-la nos comentários!
Isenção de responsabilidade: Todos os recursos fornecidos são parcialmente provenientes da Internet. Se houver qualquer violação de seus direitos autorais ou outros direitos e interesses, explique os motivos detalhados e forneça prova de direitos autorais ou direitos e interesses e envie-a para o e-mail: [email protected]. Nós cuidaremos disso para você o mais rápido possível.
Copyright© 2022 湘ICP备2022001581号-3