„Wenn ein Arbeiter seine Arbeit gut machen will, muss er zuerst seine Werkzeuge schärfen.“ – Konfuzius, „Die Gespräche des Konfuzius. Lu Linggong“
Titelseite > Programmierung > Asynchrones Iterieren über Ereignisemitter in TypeScript mit Async-Generatoren

Asynchrones Iterieren über Ereignisemitter in TypeScript mit Async-Generatoren

Veröffentlicht am 08.11.2024
Durchsuche:454

Asynchronously Iterating Over Event Emitters in TypeScript with Async Generators

Einführung

In der modernen Webentwicklung beschäftigen wir uns häufig mit Ereignissen, sei es die Verarbeitung eingehender WebSocket-Nachrichten, vom Server gesendeter Ereignisse (SSE) oder Datenströme von Diensten wie Redis Pub/Sub. Obwohl Node.js ereignisgesteuerte Funktionen bietet, fehlt ihm eine sofort einsatzbereite Möglichkeit, mithilfe von for-await...of-Schleifen asynchron über Ereignisse zu iterieren.

In diesem Beitrag zeige ich Ihnen eine einfache, aber leistungsstarke Methode zum Erstellen eines asynchronen Ereignisiterators mit TypeScript und AsyncGenerator. Dieser Ansatz soll es Ihnen ermöglichen, Ereignisse von jeder Art von Ereignisemitter auf saubere und vorhersehbare Weise zu konsumieren, mit vollständiger Kontrolle über die Abbruch- und Bereinigungslogik.

Der Anwendungsfall: Redis Pub/Sub

In einem meiner letzten Projekte musste ich Redis Pub/Sub-Kanäle abhören und vom Server gesendete Ereignisse (SSE) asynchron an verbundene Clients versenden. Die Herausforderung bestand darin, eingehende Ereignisse zu verarbeiten, ohne das System zu überlasten, und gleichzeitig dem Verbraucher die Möglichkeit zu geben, den Ereignisstream jederzeit abzubrechen.

Die Lösung? Ein Ereignisiterator, der jeden Ereignisemitter (z. B. Redis Pub/Sub) in ein asynchrones Iterable umwandelt. Dadurch können wir Ereignisse kontrolliert verarbeiten und bei Bedarf problemlos abbrechen.

Lassen Sie uns in die Implementierung eintauchen.

Der Kodex

export type Context = {
    emit: (value: T) => void;
    cancel: () => void;
};

export type CleanupFn = () => void | Promise;

export type Subscriber = (
    context: Context,
) => void | CleanupFn | Promise;

export async function* createEventIterator(
    subscriber: Subscriber,
): AsyncGenerator {
    const events: T[] = [];
    let cancelled = false;

    // Create a promise that resolves whenever a new event is added to the events array
    let resolveNext: (() => void) | null = null;

    const emit = (event: T) => {
        events.push(event);
        // If we are awaiting for a new event, resolve the promise
        if (resolveNext) {
            resolveNext();
            resolveNext = null;
        }
    };

    const cancel = () => {
        cancelled = true;
    };

    const unsubscribe = await subscriber({ emit, cancel });

    try {
        while (!cancelled) {
            // If there are events in the queue, yield the next event
            if (events.length > 0) {
                yield events.shift()!;
            } else {
                // Wait for the next event
                await new Promise((resolve) => {
                    resolveNext = resolve;
                });
            }
        }

        // Process any remaining events that were emitted before cancellation.
        while (events.length > 0) {
            yield events.shift()!;
        }
    } finally {
        await unsubscribe?.();
    }
}

Wie es funktioniert

Diese Funktion akzeptiert eine Abonnentenfunktion, die Sie in jeden Event-Emitter oder jedes Pub/Sub-System einbinden können. Der Abonnent stellt zwei wesentliche Methoden bereit:

  1. emit: Ermöglicht dem Abonnenten, neue Ereignisse in den Iterator zu übertragen.
  2. Abbrechen: Bietet eine Möglichkeit zu signalisieren, dass die Iteration gestoppt werden soll.

Die Funktion gibt einen AsyncGenerator zurück, der es Ihnen ermöglicht, mithilfe einer for-await...of-Schleife über Ereignisse zu iterieren.

Den Code aufschlüsseln

  1. Kontextobjekt:
    Der Typ „Context“ stellt eine Schnittstelle zum Ausgeben neuer Ereignisse oder zum Kündigen des Abonnements bereit. Der Abonnent nutzt diesen Kontext, um den Ablauf von Ereignissen zu steuern.

  2. Ereigniswarteschlange:
    Das Array events: T[] dient als Puffer zum Speichern ausgegebener Ereignisse. Der Generator verarbeitet diese Ereignisse nacheinander. Wenn sich keine Ereignisse in der Warteschlange befinden, wird auf die Ausgabe des nächsten Ereignisses gewartet.

  3. Logik ausgeben:
    Die Emit-Funktion fügt der Warteschlange neue Ereignisse hinzu und löst alle ausstehenden Versprechen auf (d. h. wenn der Generator auf neue Ereignisse wartet).

  4. Stornierung:
    Wenn die Abbruchfunktion aufgerufen wird, setzt sie ein Flag (abgebrochen = wahr), um zu signalisieren, dass die Schleife beendet werden soll. Alle verbleibenden Ereignisse in der Warteschlange werden noch verarbeitet, bevor der Generator abgeschlossen ist.

  5. Aufräumen:
    Nach dem Abbruch ruft der Generator die Abmeldefunktion auf (falls vorhanden), um alle erforderlichen Bereinigungen durchzuführen. Dies ist besonders wichtig, um sich von externen Systemen wie Redis abzumelden oder Ressourcen zu bereinigen.

Beispiel: Anhören von Redis Pub/Sub

Sehen wir uns an, wie wir diesen Ereignisiterator verwenden können, um Redis Pub/Sub abzuhören und die eingehenden Nachrichten asynchron zu iterieren.

import Redis from 'ioredis';

function redisEventIterator(channel: string) {
    const client = new Redis();

    return createEventIterator(({ emit, cancel }) => {
        const messageHandler = (channel: string, message: string) => {
            emit(message);
        };

        // Subscribe to the channel
        client.subscribe(channel);
        client.on('message', messageHandler);

        // Cleanup function to unsubscribe and disconnect
        return async () => {
            client.off('message', messageHandler);
            await client.unsubscribe(channel);
            await client.quit();
        };
    });
}

// Usage
(async () => {
    for await (const message of redisEventIterator('my-channel')) {
        console.log('New message:', message);

        // You can cancel the event stream if needed
        if (message === 'STOP') {
            break;
        }
    }
})();

In diesem Beispiel verwenden wir createEventIterator, um einen Redis Pub/Sub-Kanal zu abonnieren und die Nachrichten asynchron zu durchlaufen. Jedes Mal, wenn eine neue Nachricht eintrifft, wird sie an den Generator gesendet, wo wir sie in Echtzeit verarbeiten können. Wenn eine bestimmte Nachricht (z. B. „STOP“) empfangen wird, unterbrechen wir die Schleife und melden uns von Redis ab.

Beispiel: Verwendung von EventEmitter

So können Sie createEventIterator mit dem EventEmitter von Node.js verwenden:

import { EventEmitter } from 'events';

function eventEmitterIterator(emitter: EventEmitter, eventName: string) {
    return createEventIterator(({ emit, cancel }) => {
        const eventHandler = (data: string) => emit(data);

        emitter.on(eventName, eventHandler);

        // Cleanup function to remove the listener
        return () => {
            emitter.off(eventName, eventHandler);
        };
    });
}

// Usage
(async () => {
    const emitter = new EventEmitter();

    // Simulate event emissions
    setTimeout(() => emitter.emit('data', 'First event'), 1000);
    setTimeout(() => emitter.emit('data', 'Second event'), 2000);
    setTimeout(() => emitter.emit('data', 'STOP'), 3000);

    for await (const event of eventEmitterIterator(emitter, 'data')) {
        console.log('Received event:', event);

        if (event === 'STOP') {
            break;
        }
    }
})();

In diesem Beispiel:

  • Wir verwenden EventEmitter, um Ereignisse auszusenden, die von createEventIterator erfasst werden.
  • Der Iterator wartet auf das „Daten“-Ereignis und verarbeitet es asynchron.
  • Ähnlich wie im Redis-Beispiel können wir die Iteration stoppen, wenn ein bestimmtes Ereignis („STOP“) empfangen wird.

Vorteile dieses Ansatzes

  • Asynchrone Steuerung: Durch die Nutzung des AsyncGenerator können wir Ereignisse asynchron verarbeiten, sie in unserem eigenen Tempo verarbeiten und die Verarbeitung bei Bedarf anhalten.

  • Abbruch: Die Möglichkeit, den Ereignisstrom jederzeit abzubrechen, macht diesen Ansatz flexibel, insbesondere in realen Szenarien, in denen Verbindungen möglicherweise ordnungsgemäß geschlossen werden müssen.

  • Allgemeiner Zweck: Dieser Iterator kann für jeden Event-Emitter oder jedes Pub/Sub-System verwendet werden, wodurch er für verschiedene Anwendungen vielseitig einsetzbar ist.

Abschluss

Ereignisgesteuerte Architekturen sind ein Eckpfeiler vieler moderner Webanwendungen, aber ihre Verwaltung kann schwierig werden, wenn wir den Fluss von Ereignissen asynchron steuern müssen. Mit der Leistungsfähigkeit von AsyncGenerator in TypeScript können Sie elegante Lösungen wie diesen Event-Iterator erstellen, wodurch Ihr Event-Handling-Code sauberer und einfacher zu warten ist.

Ich hoffe, dieser Beitrag hilft Ihnen beim Einstieg in asynchrone Iteratoren für Ihre eigenen Event-Emitter. Wenn Sie Fragen oder Gedanken haben, teilen Sie diese gerne in den Kommentaren mit!

Freigabeerklärung Dieser Artikel ist abgedruckt unter: https://dev.to/redjohnsh/asynchronously-iterating-over-event-emitters-in-typescript-with-async-generators-3mk?1 Bei Verstößen wenden Sie sich bitte an Study_golang@163 .com, um es zu löschen
Neuestes Tutorial Mehr>

Haftungsausschluss: Alle bereitgestellten Ressourcen stammen teilweise aus dem Internet. Wenn eine Verletzung Ihres Urheberrechts oder anderer Rechte und Interessen vorliegt, erläutern Sie bitte die detaillierten Gründe und legen Sie einen Nachweis des Urheberrechts oder Ihrer Rechte und Interessen vor und senden Sie ihn dann an die E-Mail-Adresse: [email protected] Wir werden die Angelegenheit so schnell wie möglich für Sie erledigen.

Copyright© 2022 湘ICP备2022001581号-3