"Se um trabalhador quiser fazer bem o seu trabalho, ele deve primeiro afiar suas ferramentas." - Confúcio, "Os Analectos de Confúcio. Lu Linggong"
Primeira página > Programação > Iteração assíncrona sobre emissores de eventos em TypeScript com geradores assíncronos

Iteração assíncrona sobre emissores de eventos em TypeScript com geradores assíncronos

Publicado em 2024-11-08
Navegar:374

Asynchronously Iterating Over Event Emitters in TypeScript with Async Generators

Introdução

No desenvolvimento web moderno, muitas vezes lidamos com eventos, seja lidando com mensagens WebSocket recebidas, eventos enviados pelo servidor (SSE) ou fluxos de dados de serviços como Redis Pub/Sub. Embora o Node.js forneça recursos orientados a eventos, ele não possui uma maneira pronta para uso de iterar eventos de forma assíncrona usando for await...of loops.

Nesta postagem, mostrarei uma maneira simples, mas poderosa, de criar um iterador de evento assíncrono usando TypeScript e AsyncGenerator. Essa abordagem foi projetada para permitir que você consuma eventos de qualquer tipo de emissor de eventos de maneira limpa e previsível, com controle total sobre a lógica de cancelamento e limpeza.

O caso de uso: Redis Pub/Sub

Em um de meus projetos recentes, precisei ouvir os canais Redis Pub/Sub e despachar eventos enviados pelo servidor (SSE) de forma assíncrona para clientes conectados. O desafio era lidar com os eventos recebidos sem sobrecarregar o sistema e, ao mesmo tempo, permitir que o consumidor cancelasse a transmissão do evento a qualquer momento.

A solução? Um iterador de evento que converte qualquer emissor de evento (como Redis Pub/Sub) em um iterável assíncrono. Isso nos permite processar eventos de maneira controlada e lidar com cancelamentos quando necessário.

Vamos mergulhar na implementação.

O Código

export type Context = {
    emit: (value: T) => void;
    cancel: () => void;
};

export type CleanupFn = () => void | Promise;

export type Subscriber = (
    context: Context,
) => void | CleanupFn | Promise;

export async function* createEventIterator(
    subscriber: Subscriber,
): AsyncGenerator {
    const events: T[] = [];
    let cancelled = false;

    // Create a promise that resolves whenever a new event is added to the events array
    let resolveNext: (() => void) | null = null;

    const emit = (event: T) => {
        events.push(event);
        // If we are awaiting for a new event, resolve the promise
        if (resolveNext) {
            resolveNext();
            resolveNext = null;
        }
    };

    const cancel = () => {
        cancelled = true;
    };

    const unsubscribe = await subscriber({ emit, cancel });

    try {
        while (!cancelled) {
            // If there are events in the queue, yield the next event
            if (events.length > 0) {
                yield events.shift()!;
            } else {
                // Wait for the next event
                await new Promise((resolve) => {
                    resolveNext = resolve;
                });
            }
        }

        // Process any remaining events that were emitted before cancellation.
        while (events.length > 0) {
            yield events.shift()!;
        }
    } finally {
        await unsubscribe?.();
    }
}

Como funciona

Esta função aceita uma função de assinante que você pode conectar a qualquer emissor de evento ou sistema pub/sub. O assinante fornece dois métodos essenciais:

  1. emit: permite que o assinante envie novos eventos para o iterador.
  2. cancel: Fornece uma maneira de sinalizar que a iteração deve parar.

A função retorna um AsyncGenerator, permitindo que você itere sobre eventos usando um loop for await...of.

Quebrando o Código

  1. Objeto de contexto:
    O tipo Context fornece uma interface para emitir novos eventos ou cancelar a assinatura. O assinante usa esse contexto para controlar o fluxo de eventos.

  2. Fila de eventos:
    O array events: T[] serve como um buffer para armazenar eventos emitidos. O gerador processará esses eventos um por um. Se não houver eventos na fila, ele aguardará a emissão do próximo evento.

  3. Emitir Lógica:
    A função emit adiciona novos eventos à fila e resolve qualquer promessa pendente (ou seja, se o gerador estiver aguardando novos eventos).

  4. Cancelamento:
    Se a função cancel for chamada, ela define um sinalizador (cancelled = true) para sinalizar que o loop deve sair. Quaisquer eventos restantes na fila ainda serão processados ​​antes da conclusão do gerador.

  5. Limpar:
    Após o cancelamento, o gerador invocará a função de cancelamento de assinatura (se fornecida) para realizar qualquer limpeza necessária. Isso é especialmente importante para cancelar a assinatura de sistemas externos como Redis ou limpar recursos.

Exemplo: ouvindo Redis Pub/Sub

Vamos ver como podemos usar esse iterador de evento para ouvir o Redis Pub/Sub e iterar de forma assíncrona nas mensagens recebidas.

import Redis from 'ioredis';

function redisEventIterator(channel: string) {
    const client = new Redis();

    return createEventIterator(({ emit, cancel }) => {
        const messageHandler = (channel: string, message: string) => {
            emit(message);
        };

        // Subscribe to the channel
        client.subscribe(channel);
        client.on('message', messageHandler);

        // Cleanup function to unsubscribe and disconnect
        return async () => {
            client.off('message', messageHandler);
            await client.unsubscribe(channel);
            await client.quit();
        };
    });
}

// Usage
(async () => {
    for await (const message of redisEventIterator('my-channel')) {
        console.log('New message:', message);

        // You can cancel the event stream if needed
        if (message === 'STOP') {
            break;
        }
    }
})();

Neste exemplo, usamos createEventIterator para assinar um canal Redis Pub/Sub e iterar de forma assíncrona nas mensagens. Cada vez que chega uma nova mensagem, ela é emitida para o gerador, onde podemos processá-la em tempo real. Se uma mensagem específica (por exemplo, "STOP") for recebida, interrompemos o ciclo e cancelamos a assinatura do Redis.

Exemplo: usando EventEmitter

Veja como você pode usar createEventIterator com EventEmitter do Node.js:

import { EventEmitter } from 'events';

function eventEmitterIterator(emitter: EventEmitter, eventName: string) {
    return createEventIterator(({ emit, cancel }) => {
        const eventHandler = (data: string) => emit(data);

        emitter.on(eventName, eventHandler);

        // Cleanup function to remove the listener
        return () => {
            emitter.off(eventName, eventHandler);
        };
    });
}

// Usage
(async () => {
    const emitter = new EventEmitter();

    // Simulate event emissions
    setTimeout(() => emitter.emit('data', 'First event'), 1000);
    setTimeout(() => emitter.emit('data', 'Second event'), 2000);
    setTimeout(() => emitter.emit('data', 'STOP'), 3000);

    for await (const event of eventEmitterIterator(emitter, 'data')) {
        console.log('Received event:', event);

        if (event === 'STOP') {
            break;
        }
    }
})();

Neste exemplo:

  • Usamos EventEmitter para emitir eventos, que são capturados por createEventIterator.
  • O iterador escuta o evento 'data' e o processa de forma assíncrona.
  • Semelhante ao exemplo do Redis, podemos interromper a iteração quando um evento específico ('STOP') for recebido.

Benefícios desta abordagem

  • Controle assíncrono: Ao aproveitar o AsyncGenerator, podemos lidar com eventos de forma assíncrona, processá-los em nosso próprio ritmo e pausar o processamento quando necessário.

  • Cancelamento: a capacidade de cancelar o stream do evento a qualquer momento torna essa abordagem flexível, especialmente em cenários do mundo real onde as conexões podem precisar ser fechadas normalmente.

  • Uso geral: este iterador pode ser usado para qualquer emissor de evento ou sistema Pub/Sub, tornando-o versátil para diferentes aplicações.

Conclusão

As arquiteturas orientadas a eventos são a base de muitos aplicativos da Web modernos, mas podem se tornar difíceis de gerenciar quando precisamos controlar o fluxo de eventos de forma assíncrona. Com o poder do AsyncGenerator no TypeScript, você pode construir soluções elegantes como este iterador de eventos, tornando seu código de manipulação de eventos mais limpo e fácil de manter.

Espero que esta postagem ajude você a começar com iteradores assíncronos para seus próprios emissores de eventos. Se você tiver alguma dúvida ou opinião, sinta-se à vontade para compartilhá-la nos comentários!

Declaração de lançamento Este artigo foi reproduzido em: https://dev.to/redjohnsh/asynchronously-iterating-over-event-emitters-in-typescript-with-async-generators-3mk?1 Se houver alguma violação, entre em contato com study_golang@163 .com para excluí-lo
Tutorial mais recente Mais>

Isenção de responsabilidade: Todos os recursos fornecidos são parcialmente provenientes da Internet. Se houver qualquer violação de seus direitos autorais ou outros direitos e interesses, explique os motivos detalhados e forneça prova de direitos autorais ou direitos e interesses e envie-a para o e-mail: [email protected]. Nós cuidaremos disso para você o mais rápido possível.

Copyright© 2022 湘ICP备2022001581号-3