「労働者が自分の仕事をうまくやりたいなら、まず自分の道具を研ぎ澄まさなければなりません。」 - 孔子、「論語。陸霊公」
表紙 > プログラミング > 非同期ジェネレーターを使用した TypeScript でのイベント エミッターの非同期反復

非同期ジェネレーターを使用した TypeScript でのイベント エミッターの非同期反復

2024 年 11 月 8 日に公開
ブラウズ:517

Asynchronously Iterating Over Event Emitters in TypeScript with Async Generators

導入

現代の Web 開発では、受信 WebSocket メッセージ、サーバー送信イベント (SSE)、または Redis Pub/Sub などのサービスからのデータ ストリームの処理など、イベントを扱うことがよくあります。 Node.js はイベント駆動型の機能を提供しますが、for await...of ループを使用してイベントを非同期に反復するすぐに使用できる方法がありません。

この投稿では、TypeScript と AsyncGenerator を使用して非同期イベント イテレーターを作成するシンプルかつ強力な方法を説明します。このアプローチは、キャ​​ンセルとクリーンアップ ロジックを完全に制御しながら、あらゆる種類のイベント エミッターからのイベントをクリーンかつ予測可能な方法で使用できるように設計されています。

ユースケース: Redis Pub/Sub

最近のプロジェクトの 1 つでは、Redis Pub/Sub チャネルをリッスンし、接続されているクライアントにサーバー送信イベント (SSE) を非同期的にディスパッチする必要がありました。課題は、コンシューマーがいつでもイベント ストリームをキャンセルできるようにしながら、システムに負荷をかけずに受信イベントを処理することでした。

解決策は?任意のイベント エミッター (Redis Pub/Sub など) を非同期反復可能に変換するイベント イテレーター。これにより、制御された方法でイベントを処理し、必要に応じてキャンセルを適切に処理できるようになります。

実装を見ていきましょう。

コード

export type Context = {
    emit: (value: T) => void;
    cancel: () => void;
};

export type CleanupFn = () => void | Promise;

export type Subscriber = (
    context: Context,
) => void | CleanupFn | Promise;

export async function* createEventIterator(
    subscriber: Subscriber,
): AsyncGenerator {
    const events: T[] = [];
    let cancelled = false;

    // Create a promise that resolves whenever a new event is added to the events array
    let resolveNext: (() => void) | null = null;

    const emit = (event: T) => {
        events.push(event);
        // If we are awaiting for a new event, resolve the promise
        if (resolveNext) {
            resolveNext();
            resolveNext = null;
        }
    };

    const cancel = () => {
        cancelled = true;
    };

    const unsubscribe = await subscriber({ emit, cancel });

    try {
        while (!cancelled) {
            // If there are events in the queue, yield the next event
            if (events.length > 0) {
                yield events.shift()!;
            } else {
                // Wait for the next event
                await new Promise((resolve) => {
                    resolveNext = resolve;
                });
            }
        }

        // Process any remaining events that were emitted before cancellation.
        while (events.length > 0) {
            yield events.shift()!;
        }
    } finally {
        await unsubscribe?.();
    }
}

仕組み

この関数は、任意のイベント エミッターまたはパブリッシュ/サブスクライブ システムにフックできるサブスクライバー関数を受け入れます。サブスクライバーは 2 つの重要なメソッドを提供します:

  1. emit: サブスクライバが新しいイベントをイテレータにプッシュできるようにします。
  2. cancel: 反復を停止する必要があることを通知する方法を提供します。

この関数は AsyncGenerator を返し、for await...of ループを使用してイベントを反復できるようにします。

コードを分解する

  1. コンテキスト オブジェクト:
    Context 型は、新しいイベントを発行したり、サブスクリプションをキャンセルしたりするためのインターフェイスを提供します。サブスクライバは、このコンテキストを使用してイベント フローを制御します。

  2. イベントキュー:
    events: T[] 配列は、発行されたイベントを格納するバッファとして機能します。ジェネレーターはこれらのイベントを 1 つずつ処理します。キューにイベントがない場合は、次のイベントが発行されるまで待機します。

  3. エミットロジック:
    Emit 関数は、新しいイベントをキューに追加し、保留中の Promise を解決します (つまり、ジェネレーターが新しいイベントを待っている場合)。

  4. キャンセル:
    cancel 関数が呼び出されると、ループを終了する必要があることを示すフラグ (canceled = true) が設定されます。キュー内に残っているイベントは、ジェネレーターが完了する前に処理されます。

  5. 掃除
    キャンセル後、ジェネレーターは unsubscribe 関数 (指定されている場合) を呼び出して、必要なクリーンアップを実行します。これは、Redis などの外部システムからのサブスクライブを解除したり、リソースをクリーンアップしたりする場合に特に重要です。

例: Redis Pub/Sub のリスニング

このイベント イテレータを使用して Redis Pub/Sub をリッスンし、受信メッセージを非同期に反復処理する方法を見てみましょう。

import Redis from 'ioredis';

function redisEventIterator(channel: string) {
    const client = new Redis();

    return createEventIterator(({ emit, cancel }) => {
        const messageHandler = (channel: string, message: string) => {
            emit(message);
        };

        // Subscribe to the channel
        client.subscribe(channel);
        client.on('message', messageHandler);

        // Cleanup function to unsubscribe and disconnect
        return async () => {
            client.off('message', messageHandler);
            await client.unsubscribe(channel);
            await client.quit();
        };
    });
}

// Usage
(async () => {
    for await (const message of redisEventIterator('my-channel')) {
        console.log('New message:', message);

        // You can cancel the event stream if needed
        if (message === 'STOP') {
            break;
        }
    }
})();

この例では、createEventIterator を使用して Redis Pub/Sub チャネルにサブスクライブし、メッセージを非同期的に反復処理します。新しいメッセージが到着するたびに、それはジェネレーターに出力され、そこでリアルタイムで処理できます。特定のメッセージ (「STOP」など) を受信した場合、ループを終了し、Redis からサブスクライブを解除します。

例: EventEmitter の使用

Node.js の EventEmitter で createEventIterator を使用する方法は次のとおりです:

import { EventEmitter } from 'events';

function eventEmitterIterator(emitter: EventEmitter, eventName: string) {
    return createEventIterator(({ emit, cancel }) => {
        const eventHandler = (data: string) => emit(data);

        emitter.on(eventName, eventHandler);

        // Cleanup function to remove the listener
        return () => {
            emitter.off(eventName, eventHandler);
        };
    });
}

// Usage
(async () => {
    const emitter = new EventEmitter();

    // Simulate event emissions
    setTimeout(() => emitter.emit('data', 'First event'), 1000);
    setTimeout(() => emitter.emit('data', 'Second event'), 2000);
    setTimeout(() => emitter.emit('data', 'STOP'), 3000);

    for await (const event of eventEmitterIterator(emitter, 'data')) {
        console.log('Received event:', event);

        if (event === 'STOP') {
            break;
        }
    }
})();

この例では:

  • EventEmitter を使用してイベントを発行し、createEventIterator によってキャプチャされます。
  • イテレータは「data」イベントをリッスンし、それを非同期的に処理します。
  • Redis の例と同様に、特定のイベント ('STOP') を受信したときに反復を停止できます。

このアプローチの利点

  • 非同期制御: AsyncGenerator を活用することで、イベントを非同期に処理し、自分のペースで処理し、必要に応じて処理を一時停止できます。

  • キャンセル: イベント ストリームをいつでもキャンセルできるため、特に接続を正常に閉じる必要がある現実のシナリオでは、このアプローチが柔軟になります。

  • 汎用: このイテレータは任意のイベント エミッタまたは Pub/Sub システムに使用できるため、さまざまなアプリケーションに多用途に使用できます。

結論

イベント駆動型アーキテクチャは、多くの最新の Web アプリケーションの基礎ですが、イベント フローを非同期に制御する必要がある場合、管理が難しくなる可能性があります。 TypeScript の AsyncGenerator の機能を利用すると、このイベント イテレータのような洗練されたソリューションを構築でき、イベント処理コードがよりクリーンになり、保守が容易になります。

この投稿が、独自のイベント エミッターの非同期イテレーターの使用を開始するのに役立つことを願っています。ご質問やご意見がございましたら、お気軽にコメント欄で共有してください。

リリースステートメント この記事は次の場所に転載されています: https://dev.to/redjohnsh/asynchronously-iteration-over-event-emitters-in-typescript-with-async-generators-3mk?1 侵害がある場合は、study_golang@163 までご連絡ください。 .comを削除してください
最新のチュートリアル もっと>

免責事項: 提供されるすべてのリソースの一部はインターネットからのものです。お客様の著作権またはその他の権利および利益の侵害がある場合は、詳細な理由を説明し、著作権または権利および利益の証拠を提出して、電子メール [email protected] に送信してください。 できるだけ早く対応させていただきます。

Copyright© 2022 湘ICP备2022001581号-3