In der modernen Webentwicklung beschäftigen wir uns häufig mit Ereignissen, sei es die Verarbeitung eingehender WebSocket-Nachrichten, vom Server gesendeter Ereignisse (SSE) oder Datenströme von Diensten wie Redis Pub/Sub. Obwohl Node.js ereignisgesteuerte Funktionen bietet, fehlt ihm eine sofort einsatzbereite Möglichkeit, mithilfe von for-await...of-Schleifen asynchron über Ereignisse zu iterieren.
In diesem Beitrag zeige ich Ihnen eine einfache, aber leistungsstarke Methode zum Erstellen eines asynchronen Ereignisiterators mit TypeScript und AsyncGenerator. Dieser Ansatz soll es Ihnen ermöglichen, Ereignisse von jeder Art von Ereignisemitter auf saubere und vorhersehbare Weise zu konsumieren, mit vollständiger Kontrolle über die Abbruch- und Bereinigungslogik.
In einem meiner letzten Projekte musste ich Redis Pub/Sub-Kanäle abhören und vom Server gesendete Ereignisse (SSE) asynchron an verbundene Clients versenden. Die Herausforderung bestand darin, eingehende Ereignisse zu verarbeiten, ohne das System zu überlasten, und gleichzeitig dem Verbraucher die Möglichkeit zu geben, den Ereignisstream jederzeit abzubrechen.
Die Lösung? Ein Ereignisiterator, der jeden Ereignisemitter (z. B. Redis Pub/Sub) in ein asynchrones Iterable umwandelt. Dadurch können wir Ereignisse kontrolliert verarbeiten und bei Bedarf problemlos abbrechen.
Lassen Sie uns in die Implementierung eintauchen.
export type Context= { emit: (value: T) => void; cancel: () => void; }; export type CleanupFn = () => void | Promise ; export type Subscriber = ( context: Context , ) => void | CleanupFn | Promise ; export async function* createEventIterator ( subscriber: Subscriber , ): AsyncGenerator { const events: T[] = []; let cancelled = false; // Create a promise that resolves whenever a new event is added to the events array let resolveNext: (() => void) | null = null; const emit = (event: T) => { events.push(event); // If we are awaiting for a new event, resolve the promise if (resolveNext) { resolveNext(); resolveNext = null; } }; const cancel = () => { cancelled = true; }; const unsubscribe = await subscriber({ emit, cancel }); try { while (!cancelled) { // If there are events in the queue, yield the next event if (events.length > 0) { yield events.shift()!; } else { // Wait for the next event await new Promise ((resolve) => { resolveNext = resolve; }); } } // Process any remaining events that were emitted before cancellation. while (events.length > 0) { yield events.shift()!; } } finally { await unsubscribe?.(); } }
Diese Funktion akzeptiert eine Abonnentenfunktion, die Sie in jeden Event-Emitter oder jedes Pub/Sub-System einbinden können. Der Abonnent stellt zwei wesentliche Methoden bereit:
Die Funktion gibt einen AsyncGenerator
Kontextobjekt:
Der Typ „Context
Ereigniswarteschlange:
Das Array events: T[] dient als Puffer zum Speichern ausgegebener Ereignisse. Der Generator verarbeitet diese Ereignisse nacheinander. Wenn sich keine Ereignisse in der Warteschlange befinden, wird auf die Ausgabe des nächsten Ereignisses gewartet.
Logik ausgeben:
Die Emit-Funktion fügt der Warteschlange neue Ereignisse hinzu und löst alle ausstehenden Versprechen auf (d. h. wenn der Generator auf neue Ereignisse wartet).
Stornierung:
Wenn die Abbruchfunktion aufgerufen wird, setzt sie ein Flag (abgebrochen = wahr), um zu signalisieren, dass die Schleife beendet werden soll. Alle verbleibenden Ereignisse in der Warteschlange werden noch verarbeitet, bevor der Generator abgeschlossen ist.
Aufräumen:
Nach dem Abbruch ruft der Generator die Abmeldefunktion auf (falls vorhanden), um alle erforderlichen Bereinigungen durchzuführen. Dies ist besonders wichtig, um sich von externen Systemen wie Redis abzumelden oder Ressourcen zu bereinigen.
Sehen wir uns an, wie wir diesen Ereignisiterator verwenden können, um Redis Pub/Sub abzuhören und die eingehenden Nachrichten asynchron zu iterieren.
import Redis from 'ioredis'; function redisEventIterator(channel: string) { const client = new Redis(); return createEventIterator(({ emit, cancel }) => { const messageHandler = (channel: string, message: string) => { emit(message); }; // Subscribe to the channel client.subscribe(channel); client.on('message', messageHandler); // Cleanup function to unsubscribe and disconnect return async () => { client.off('message', messageHandler); await client.unsubscribe(channel); await client.quit(); }; }); } // Usage (async () => { for await (const message of redisEventIterator('my-channel')) { console.log('New message:', message); // You can cancel the event stream if needed if (message === 'STOP') { break; } } })();
In diesem Beispiel verwenden wir createEventIterator, um einen Redis Pub/Sub-Kanal zu abonnieren und die Nachrichten asynchron zu durchlaufen. Jedes Mal, wenn eine neue Nachricht eintrifft, wird sie an den Generator gesendet, wo wir sie in Echtzeit verarbeiten können. Wenn eine bestimmte Nachricht (z. B. „STOP“) empfangen wird, unterbrechen wir die Schleife und melden uns von Redis ab.
So können Sie createEventIterator mit dem EventEmitter von Node.js verwenden:
import { EventEmitter } from 'events'; function eventEmitterIterator(emitter: EventEmitter, eventName: string) { return createEventIterator(({ emit, cancel }) => { const eventHandler = (data: string) => emit(data); emitter.on(eventName, eventHandler); // Cleanup function to remove the listener return () => { emitter.off(eventName, eventHandler); }; }); } // Usage (async () => { const emitter = new EventEmitter(); // Simulate event emissions setTimeout(() => emitter.emit('data', 'First event'), 1000); setTimeout(() => emitter.emit('data', 'Second event'), 2000); setTimeout(() => emitter.emit('data', 'STOP'), 3000); for await (const event of eventEmitterIterator(emitter, 'data')) { console.log('Received event:', event); if (event === 'STOP') { break; } } })();
In diesem Beispiel:
Asynchrone Steuerung: Durch die Nutzung des AsyncGenerator können wir Ereignisse asynchron verarbeiten, sie in unserem eigenen Tempo verarbeiten und die Verarbeitung bei Bedarf anhalten.
Abbruch: Die Möglichkeit, den Ereignisstrom jederzeit abzubrechen, macht diesen Ansatz flexibel, insbesondere in realen Szenarien, in denen Verbindungen möglicherweise ordnungsgemäß geschlossen werden müssen.
Allgemeiner Zweck: Dieser Iterator kann für jeden Event-Emitter oder jedes Pub/Sub-System verwendet werden, wodurch er für verschiedene Anwendungen vielseitig einsetzbar ist.
Ereignisgesteuerte Architekturen sind ein Eckpfeiler vieler moderner Webanwendungen, aber ihre Verwaltung kann schwierig werden, wenn wir den Fluss von Ereignissen asynchron steuern müssen. Mit der Leistungsfähigkeit von AsyncGenerator in TypeScript können Sie elegante Lösungen wie diesen Event-Iterator erstellen, wodurch Ihr Event-Handling-Code sauberer und einfacher zu warten ist.
Ich hoffe, dieser Beitrag hilft Ihnen beim Einstieg in asynchrone Iteratoren für Ihre eigenen Event-Emitter. Wenn Sie Fragen oder Gedanken haben, teilen Sie diese gerne in den Kommentaren mit!
Haftungsausschluss: Alle bereitgestellten Ressourcen stammen teilweise aus dem Internet. Wenn eine Verletzung Ihres Urheberrechts oder anderer Rechte und Interessen vorliegt, erläutern Sie bitte die detaillierten Gründe und legen Sie einen Nachweis des Urheberrechts oder Ihrer Rechte und Interessen vor und senden Sie ihn dann an die E-Mail-Adresse: [email protected] Wir werden die Angelegenheit so schnell wie möglich für Sie erledigen.
Copyright© 2022 湘ICP备2022001581号-3