現代の Web 開発では、受信 WebSocket メッセージ、サーバー送信イベント (SSE)、または Redis Pub/Sub などのサービスからのデータ ストリームの処理など、イベントを扱うことがよくあります。 Node.js はイベント駆動型の機能を提供しますが、for await...of ループを使用してイベントを非同期に反復するすぐに使用できる方法がありません。
この投稿では、TypeScript と AsyncGenerator を使用して非同期イベント イテレーターを作成するシンプルかつ強力な方法を説明します。このアプローチは、キャンセルとクリーンアップ ロジックを完全に制御しながら、あらゆる種類のイベント エミッターからのイベントをクリーンかつ予測可能な方法で使用できるように設計されています。
最近のプロジェクトの 1 つでは、Redis Pub/Sub チャネルをリッスンし、接続されているクライアントにサーバー送信イベント (SSE) を非同期的にディスパッチする必要がありました。課題は、コンシューマーがいつでもイベント ストリームをキャンセルできるようにしながら、システムに負荷をかけずに受信イベントを処理することでした。
解決策は?任意のイベント エミッター (Redis Pub/Sub など) を非同期反復可能に変換するイベント イテレーター。これにより、制御された方法でイベントを処理し、必要に応じてキャンセルを適切に処理できるようになります。
実装を見ていきましょう。
export type Context= { emit: (value: T) => void; cancel: () => void; }; export type CleanupFn = () => void | Promise ; export type Subscriber = ( context: Context , ) => void | CleanupFn | Promise ; export async function* createEventIterator ( subscriber: Subscriber , ): AsyncGenerator { const events: T[] = []; let cancelled = false; // Create a promise that resolves whenever a new event is added to the events array let resolveNext: (() => void) | null = null; const emit = (event: T) => { events.push(event); // If we are awaiting for a new event, resolve the promise if (resolveNext) { resolveNext(); resolveNext = null; } }; const cancel = () => { cancelled = true; }; const unsubscribe = await subscriber({ emit, cancel }); try { while (!cancelled) { // If there are events in the queue, yield the next event if (events.length > 0) { yield events.shift()!; } else { // Wait for the next event await new Promise ((resolve) => { resolveNext = resolve; }); } } // Process any remaining events that were emitted before cancellation. while (events.length > 0) { yield events.shift()!; } } finally { await unsubscribe?.(); } }
この関数は、任意のイベント エミッターまたはパブリッシュ/サブスクライブ システムにフックできるサブスクライバー関数を受け入れます。サブスクライバーは 2 つの重要なメソッドを提供します:
この関数は AsyncGenerator
コンテキスト オブジェクト:
Context
イベントキュー:
events: T[] 配列は、発行されたイベントを格納するバッファとして機能します。ジェネレーターはこれらのイベントを 1 つずつ処理します。キューにイベントがない場合は、次のイベントが発行されるまで待機します。
エミットロジック:
Emit 関数は、新しいイベントをキューに追加し、保留中の Promise を解決します (つまり、ジェネレーターが新しいイベントを待っている場合)。
キャンセル:
cancel 関数が呼び出されると、ループを終了する必要があることを示すフラグ (canceled = true) が設定されます。キュー内に残っているイベントは、ジェネレーターが完了する前に処理されます。
掃除:
キャンセル後、ジェネレーターは unsubscribe 関数 (指定されている場合) を呼び出して、必要なクリーンアップを実行します。これは、Redis などの外部システムからのサブスクライブを解除したり、リソースをクリーンアップしたりする場合に特に重要です。
このイベント イテレータを使用して Redis Pub/Sub をリッスンし、受信メッセージを非同期に反復処理する方法を見てみましょう。
import Redis from 'ioredis'; function redisEventIterator(channel: string) { const client = new Redis(); return createEventIterator(({ emit, cancel }) => { const messageHandler = (channel: string, message: string) => { emit(message); }; // Subscribe to the channel client.subscribe(channel); client.on('message', messageHandler); // Cleanup function to unsubscribe and disconnect return async () => { client.off('message', messageHandler); await client.unsubscribe(channel); await client.quit(); }; }); } // Usage (async () => { for await (const message of redisEventIterator('my-channel')) { console.log('New message:', message); // You can cancel the event stream if needed if (message === 'STOP') { break; } } })();
この例では、createEventIterator を使用して Redis Pub/Sub チャネルにサブスクライブし、メッセージを非同期的に反復処理します。新しいメッセージが到着するたびに、それはジェネレーターに出力され、そこでリアルタイムで処理できます。特定のメッセージ (「STOP」など) を受信した場合、ループを終了し、Redis からサブスクライブを解除します。
Node.js の EventEmitter で createEventIterator を使用する方法は次のとおりです:
import { EventEmitter } from 'events'; function eventEmitterIterator(emitter: EventEmitter, eventName: string) { return createEventIterator(({ emit, cancel }) => { const eventHandler = (data: string) => emit(data); emitter.on(eventName, eventHandler); // Cleanup function to remove the listener return () => { emitter.off(eventName, eventHandler); }; }); } // Usage (async () => { const emitter = new EventEmitter(); // Simulate event emissions setTimeout(() => emitter.emit('data', 'First event'), 1000); setTimeout(() => emitter.emit('data', 'Second event'), 2000); setTimeout(() => emitter.emit('data', 'STOP'), 3000); for await (const event of eventEmitterIterator(emitter, 'data')) { console.log('Received event:', event); if (event === 'STOP') { break; } } })();
この例では:
非同期制御: AsyncGenerator を活用することで、イベントを非同期に処理し、自分のペースで処理し、必要に応じて処理を一時停止できます。
キャンセル: イベント ストリームをいつでもキャンセルできるため、特に接続を正常に閉じる必要がある現実のシナリオでは、このアプローチが柔軟になります。
汎用: このイテレータは任意のイベント エミッタまたは Pub/Sub システムに使用できるため、さまざまなアプリケーションに多用途に使用できます。
イベント駆動型アーキテクチャは、多くの最新の Web アプリケーションの基礎ですが、イベント フローを非同期に制御する必要がある場合、管理が難しくなる可能性があります。 TypeScript の AsyncGenerator の機能を利用すると、このイベント イテレータのような洗練されたソリューションを構築でき、イベント処理コードがよりクリーンになり、保守が容易になります。
この投稿が、独自のイベント エミッターの非同期イテレーターの使用を開始するのに役立つことを願っています。ご質問やご意見がございましたら、お気軽にコメント欄で共有してください。
免責事項: 提供されるすべてのリソースの一部はインターネットからのものです。お客様の著作権またはその他の権利および利益の侵害がある場合は、詳細な理由を説明し、著作権または権利および利益の証拠を提出して、電子メール [email protected] に送信してください。 できるだけ早く対応させていただきます。
Copyright© 2022 湘ICP备2022001581号-3