」工欲善其事,必先利其器。「—孔子《論語.錄靈公》
首頁 > 程式設計 > 使用非同步產生器在 TypeScript 中非同步迭代事件發射器

使用非同步產生器在 TypeScript 中非同步迭代事件發射器

發佈於2024-11-08
瀏覽:788

Asynchronously Iterating Over Event Emitters in TypeScript with Async Generators

介绍

在现代 Web 开发中,我们经常处理事件,无论是处理传入的 WebSocket 消息、服务器发送的事件 (SSE) 还是来自 Redis Pub/Sub 等服务的数据流。虽然 Node.js 提供了事件驱动的功能,但它缺乏一种开箱即用的方法来使用 for wait...of 循环异步迭代事件。

在这篇文章中,我将引导您通过一种简单而强大的方法来使用 TypeScript 和 AsyncGenerator 创建异步事件迭代器。这种方法旨在允许您以干净且可预测的方式使用来自任何类型事件发射器的事件,并完全控制取消和清理逻辑。

用例:Redis Pub/Sub

在我最近的一个项目中,我需要监听 Redis Pub/Sub 通道并将服务器发送的事件 (SSE) 异步分派给连接的客户端。面临的挑战是在不压垮系统的情况下处理传入事件,同时允许消费者随时取消事件流。

解决方案?一个事件迭代器,可将任何事件发射器(例如 Redis Pub/Sub)转换为异步迭代器。这使我们能够以受控的方式处理事件,并在必要时优雅地处理取消。

让我们深入了解实施。

守则

export type Context = {
    emit: (value: T) => void;
    cancel: () => void;
};

export type CleanupFn = () => void | Promise;

export type Subscriber = (
    context: Context,
) => void | CleanupFn | Promise;

export async function* createEventIterator(
    subscriber: Subscriber,
): AsyncGenerator {
    const events: T[] = [];
    let cancelled = false;

    // Create a promise that resolves whenever a new event is added to the events array
    let resolveNext: (() => void) | null = null;

    const emit = (event: T) => {
        events.push(event);
        // If we are awaiting for a new event, resolve the promise
        if (resolveNext) {
            resolveNext();
            resolveNext = null;
        }
    };

    const cancel = () => {
        cancelled = true;
    };

    const unsubscribe = await subscriber({ emit, cancel });

    try {
        while (!cancelled) {
            // If there are events in the queue, yield the next event
            if (events.length > 0) {
                yield events.shift()!;
            } else {
                // Wait for the next event
                await new Promise((resolve) => {
                    resolveNext = resolve;
                });
            }
        }

        // Process any remaining events that were emitted before cancellation.
        while (events.length > 0) {
            yield events.shift()!;
        }
    } finally {
        await unsubscribe?.();
    }
}

它是如何运作的

此函数接受订阅者函数,您可以将其挂接到任何事件发射器或发布/订阅系统中。订阅者提供了两个基本方法:

  1. emit:允许订阅者将新事件推送到迭代器中。
  2. 取消:提供一种方式来指示迭代应该停止。

该函数返回一个 AsyncGenerator,允许您使用 for wait...of 循环迭代事件。

分解代码

  1. 上下文对象:
    Context 类型提供了一个接口来发出新事件或取消订阅。订阅者使用此上下文来控制事件流。

  2. 事件队列:
    events: T[] 数组用作存储发出的事件的缓冲区。生成器将一一处理这些事件。如果队列中没有事件,它将等待下一个事件被发出。

  3. 发出逻辑:
    发出函数将新事件添加到队列并解决任何待处理的承诺(即,如果生成器正在等待新事件)。

  4. 消除
    如果调用 cancel 函数,它会设置一个标志 (cancelled = true) 来指示循环应该退出。在生成器完成之前,队列中的任何剩余事件仍将被处理。

  5. 清理
    取消后,生成器将调用取消订阅函数(如果提供)来执行任何必要的清理。这对于取消订阅 Redis 等外部系统或清理资源尤其重要。

示例:收听 Redis Pub/Sub

让我们看看如何使用此事件迭代器来监听 Redis Pub/Sub 并异步迭代传入的消息。

import Redis from 'ioredis';

function redisEventIterator(channel: string) {
    const client = new Redis();

    return createEventIterator(({ emit, cancel }) => {
        const messageHandler = (channel: string, message: string) => {
            emit(message);
        };

        // Subscribe to the channel
        client.subscribe(channel);
        client.on('message', messageHandler);

        // Cleanup function to unsubscribe and disconnect
        return async () => {
            client.off('message', messageHandler);
            await client.unsubscribe(channel);
            await client.quit();
        };
    });
}

// Usage
(async () => {
    for await (const message of redisEventIterator('my-channel')) {
        console.log('New message:', message);

        // You can cancel the event stream if needed
        if (message === 'STOP') {
            break;
        }
    }
})();

在此示例中,我们使用 createEventIterator 订阅 Redis Pub/Sub 通道并异步迭代消息。每次有新消息到达时,它都会被发送到生成器中,我们可以在那里实时处理它。如果收到特定消息(例如“STOP”),我们将中断循环并取消订阅 Redis。

示例:使用 EventEmitter

以下是如何将 createEventIterator 与 Node.js 的 EventEmitter 结合使用:

import { EventEmitter } from 'events';

function eventEmitterIterator(emitter: EventEmitter, eventName: string) {
    return createEventIterator(({ emit, cancel }) => {
        const eventHandler = (data: string) => emit(data);

        emitter.on(eventName, eventHandler);

        // Cleanup function to remove the listener
        return () => {
            emitter.off(eventName, eventHandler);
        };
    });
}

// Usage
(async () => {
    const emitter = new EventEmitter();

    // Simulate event emissions
    setTimeout(() => emitter.emit('data', 'First event'), 1000);
    setTimeout(() => emitter.emit('data', 'Second event'), 2000);
    setTimeout(() => emitter.emit('data', 'STOP'), 3000);

    for await (const event of eventEmitterIterator(emitter, 'data')) {
        console.log('Received event:', event);

        if (event === 'STOP') {
            break;
        }
    }
})();

在此示例中:

  • 我们使用EventEmitter来发出事件,这些事件由createEventIterator捕获。
  • 迭代器监听“data”事件并异步处理它。
  • 与Redis示例类似,我们可以在收到特定事件('STOP')时停止迭代。

这种方法的好处

  • 异步控制:通过利用AsyncGenerator,我们可以异步处理事件,按照自己的节奏处理它们,并在需要时暂停处理。

  • 取消:随时取消事件流的能力使这种方法变得灵活,特别是在可能需要正常关闭连接的现实场景中。

  • 通用:此迭代器可用于任何事件发射器或 Pub/Sub 系统,使其适用于不同的应用程序。

结论

事件驱动架构是许多现代 Web 应用程序的基石,但当我们需要异步控制事件流时,它们的管理可能会变得棘手。借助 TypeScript 中 AsyncGenerator 的强大功能,您可以构建像此事件迭代器这样的优雅解决方案,使您的事件处理代码更干净、更易于维护。

我希望这篇文章可以帮助您开始为您自己的事件发射器使用异步迭代器。如果您有任何问题或想法,请随时在评论中分享!

版本聲明 本文轉載於:https://dev.to/redjohnsh/asynchronously-iterating-over-event-emitters-in-typescript-with-async-generators-3mk?1如有侵犯,請聯絡[email protected]刪除
最新教學 更多>
  • 如何從具有不同數組長度的字典創建 Pandas DataFrame?
    如何從具有不同數組長度的字典創建 Pandas DataFrame?
    從條目長度不均勻的字典建立 DataFrame在 Python 中,可以從每個條目保存一個 Numpy 陣列的字典建立 DataFrame。然而,當條目之間的數組長度不同時,就會出現挑戰。預設情況下,Pandas 需要統一長度的數組,從而導致類似“ValueError: arrays must al...
    程式設計 發佈於2024-11-09
  • Bootstrap 4 Beta 中的列偏移發生了什麼事?
    Bootstrap 4 Beta 中的列偏移發生了什麼事?
    Bootstrap 4 Beta:列偏移的刪除和恢復Bootstrap 4 在其Beta 1 版本中引入了重大更改柱子偏移了。然而,隨著 Beta 2 的後續發布,這些變化已經逆轉。 從 offset-md-* 到 ml-auto在 Bootstrap 4 Beta 1 中, offset-md-*...
    程式設計 發佈於2024-11-09
  • 在 Go 中使用 WebSocket 進行即時通信
    在 Go 中使用 WebSocket 進行即時通信
    构建需要实时更新的应用程序(例如聊天应用程序、实时通知或协作工具)需要一种比传统 HTTP 更快、更具交互性的通信方法。这就是 WebSockets 发挥作用的地方!今天,我们将探讨如何在 Go 中使用 WebSocket,以便您可以向应用程序添加实时功能。 在这篇文章中,我们将介绍: WebSoc...
    程式設計 發佈於2024-11-09
  • 為什麼在 Java 中使用相同的種子時會得到相同的隨機數?
    為什麼在 Java 中使用相同的種子時會得到相同的隨機數?
    具有固定種子的Java隨機數:為什麼輸出相同? 在您的程式碼中,您定義了一種使用以下命令產生隨機數的方法指定的種子。但是,您會注意到,當您提供相同的種子時,所有 100 個產生的數字都是相同的。 此行為是預期的,因為在 Random 建構函式中使用相同的種子會產生可預測的數字序列。種子是初始化隨機數...
    程式設計 發佈於2024-11-09
  • jQuery Chaining 如何簡化開發並提高程式碼效率?
    jQuery Chaining 如何簡化開發並提高程式碼效率?
    理解 jQuery 中的物件和方法連結在 jQuery 中,連結允許在單一語句中串聯多個 jQuery 方法。這使開發人員能夠簡化程式碼並輕鬆執行複雜的操作。 連結的基本原理涉及每個 jQuery 方法的回傳值。當呼叫 jQuery 方法時,它通常會傳回一個表示所選元素的 jQuery 物件。這允許...
    程式設計 發佈於2024-11-09
  • Hono.js 基準測試:Node.js、Deno 和 Bun — 哪個最快?
    Hono.js 基準測試:Node.js、Deno 和 Bun — 哪個最快?
    Deno 2.0 剛剛發布,並聲稱比 Bun 和 Node.js 更快,同樣,Bun 也聲稱更快。這引起了我的興趣,所以我決定測試它們的性能,看看它們在現實場景中的比較。 為了公平比較,我需要選擇一個與所有三種 JavaScript 執行時間環境(Node.js、Deno 和 Bun)相容的框架。...
    程式設計 發佈於2024-11-09
  • 大批
    大批
    方法是可以在物件上呼叫的 fns 數組是對象,因此它們在 JS 中也有方法。 slice(begin):將陣列的一部分提取到新數組中,而不改變原始數組。 let arr = ['a','b','c','d','e']; // Usecase: Extract till index ...
    程式設計 發佈於2024-11-09
  • 何時應該使用 Tkinter 的 Entry Get 函數來有效檢索輸入?
    何時應該使用 Tkinter 的 Entry Get 函數來有效檢索輸入?
    Tkinter Entry 的Get 函數:深入探討其功能和用法在Tkinter 中,Entry 小部件通常用於收集用戶輸入以進一步收集使用者輸入然而,與Entry 關聯的get() 函數通常無法產生所需的結果,這可能會讓開發人員感到困惑。本文深入探討 get() 的概念,全面了解其執行與應用。 理...
    程式設計 發佈於2024-11-09
  • 如何克服 PHP 中日期表示的 2038 限制?
    如何克服 PHP 中日期表示的 2038 限制?
    PHP 中的日期表示:克服2038 年限制雖然PHP 的原生日期函數在2038 年有一個截止日期,但還有其他方法處理超出此限制的日期。其中一種方法是僅儲存年、月和日,而忽略小時、分鐘、秒和毫秒部分。 透過丟棄這些附加時間部分,可以顯著擴展可表示日期的範圍。這是因為這些元件中的每一個都佔用了 PHP ...
    程式設計 發佈於2024-11-09
  • 如何在 Go (Gorilla) 中向特定客戶端發送有針對性的 Websocket 更新?
    如何在 Go (Gorilla) 中向特定客戶端發送有針對性的 Websocket 更新?
    在Go (Gorilla) 中向特定客戶端發送Websocket 更新儘管是Go 新手,但您尋求有關實現Websocket 通信的指導您的預輸入項目。您已嘗試利用 Gorilla 的 GitHub 儲存庫中的範例,但在理解如何識別特定客戶端並針對 websocket 更新進行定位方面遇到了挑戰。 要...
    程式設計 發佈於2024-11-09
  • 使用swoole作為基於ESP6的腳本可程式控制器的雲端物聯網閘道框架
    使用swoole作為基於ESP6的腳本可程式控制器的雲端物聯網閘道框架
    腳本可程式控制器的本機功能基本上已完成,開始實現遠端相關功能。 遠端系統整體架構如下: 使用ESP8266的SDK實作tcp伺服器和tcp客戶端。 在tcp伺服器的基礎上編寫http協議解析程式碼,設計簡單的http伺服器,處理與瀏覽器的資料交互,包括內建網頁的下載,並使用ajax技術獲取狀態並...
    程式設計 發佈於2024-11-09
  • 為什麼在 Java 的 Random 類別中設定種子會傳回相同的數字?
    為什麼在 Java 的 Random 類別中設定種子會傳回相同的數字?
    Java隨機數產生:為什麼設定種子會回傳相同的數字? 儘管將Random類別的種子設定為特定值,但隨機數產生器始終會傳回相同的數字。讓我們探討一下可能導致此問題的原因。 了解 Random 類別和種子初始化Java Random 類別旨在產生偽隨機數。預設情況下,它使用其內部時鐘作為種子值,使其產生...
    程式設計 發佈於2024-11-09
  • 如何克服使用反射設定結構體欄位值時 SetCan() 總是傳回 False 的問題?
    如何克服使用反射設定結構體欄位值時 SetCan() 總是傳回 False 的問題?
    使用結構體的 SetString 探索反射反射提供了動態操作 Go 結構的強大工具。在此範例中,我們在嘗試使用反射來設定結構體欄位的值時遇到一個常見問題:CanSet() 始終傳回 false。這種障礙阻止了字段修改,使我們陷入困境。 識別陷阱提供的程式碼片段突顯了兩個基本錯誤:傳遞值而非指標: ...
    程式設計 發佈於2024-11-09
  • 為什麼 MySQL 中帶有子查詢的「IN」查詢很慢,如何提升效能?
    為什麼 MySQL 中帶有子查詢的「IN」查詢很慢,如何提升效能?
    MySQL 中帶有子查詢的緩慢「IN」查詢當使用子查詢時,使用「IN」運算子的MySQL查詢可能會表現出顯著的效能下降檢索「IN」子句的值很複雜。在這種情況下,用明確值取代子查詢結果會顯著縮短執行時間。 要了解此行為的原因,需要注意的是,每次評估「IN」查詢時,MySQL 都會執行子查詢。在提供的範...
    程式設計 發佈於2024-11-09
  • 如何使用WinAPI取得螢幕解析度?
    如何使用WinAPI取得螢幕解析度?
    使用 WinAPI 取得螢幕解析度在 WinAPI 中,存在多個函數來決定目前螢幕解析度。適當的選擇取決於具體要求。 檢索顯示尺寸檢索顯示尺寸檢索顯示尺寸 主監視器:使用GetSystemMetrics(SM_CXSCREEN) 和GetSystemMetrics( SM_CYCYSEN) 取得主顯...
    程式設計 發佈於2024-11-09

免責聲明: 提供的所有資源部分來自互聯網,如果有侵犯您的版權或其他權益,請說明詳細緣由並提供版權或權益證明然後發到郵箱:[email protected] 我們會在第一時間內為您處理。

Copyright© 2022 湘ICP备2022001581号-3