在现代 Web 开发中,我们经常处理事件,无论是处理传入的 WebSocket 消息、服务器发送的事件 (SSE) 还是来自 Redis Pub/Sub 等服务的数据流。虽然 Node.js 提供了事件驱动的功能,但它缺乏一种开箱即用的方法来使用 for wait...of 循环异步迭代事件。
在这篇文章中,我将引导您通过一种简单而强大的方法来使用 TypeScript 和 AsyncGenerator 创建异步事件迭代器。这种方法旨在允许您以干净且可预测的方式使用来自任何类型事件发射器的事件,并完全控制取消和清理逻辑。
在我最近的一个项目中,我需要监听 Redis Pub/Sub 通道并将服务器发送的事件 (SSE) 异步分派给连接的客户端。面临的挑战是在不压垮系统的情况下处理传入事件,同时允许消费者随时取消事件流。
解决方案?一个事件迭代器,可将任何事件发射器(例如 Redis Pub/Sub)转换为异步迭代器。这使我们能够以受控的方式处理事件,并在必要时优雅地处理取消。
让我们深入了解实施。
export type Context= { emit: (value: T) => void; cancel: () => void; }; export type CleanupFn = () => void | Promise ; export type Subscriber = ( context: Context , ) => void | CleanupFn | Promise ; export async function* createEventIterator ( subscriber: Subscriber , ): AsyncGenerator { const events: T[] = []; let cancelled = false; // Create a promise that resolves whenever a new event is added to the events array let resolveNext: (() => void) | null = null; const emit = (event: T) => { events.push(event); // If we are awaiting for a new event, resolve the promise if (resolveNext) { resolveNext(); resolveNext = null; } }; const cancel = () => { cancelled = true; }; const unsubscribe = await subscriber({ emit, cancel }); try { while (!cancelled) { // If there are events in the queue, yield the next event if (events.length > 0) { yield events.shift()!; } else { // Wait for the next event await new Promise ((resolve) => { resolveNext = resolve; }); } } // Process any remaining events that were emitted before cancellation. while (events.length > 0) { yield events.shift()!; } } finally { await unsubscribe?.(); } }
此函数接受订阅者函数,您可以将其挂接到任何事件发射器或发布/订阅系统中。订阅者提供了两个基本方法:
该函数返回一个 AsyncGenerator
上下文对象:
Context
事件队列:
events: T[] 数组用作存储发出的事件的缓冲区。生成器将一一处理这些事件。如果队列中没有事件,它将等待下一个事件被发出。
发出逻辑:
发出函数将新事件添加到队列并解决任何待处理的承诺(即,如果生成器正在等待新事件)。
消除:
如果调用 cancel 函数,它会设置一个标志 (cancelled = true) 来指示循环应该退出。在生成器完成之前,队列中的任何剩余事件仍将被处理。
清理:
取消后,生成器将调用取消订阅函数(如果提供)来执行任何必要的清理。这对于取消订阅 Redis 等外部系统或清理资源尤其重要。
让我们看看如何使用此事件迭代器来监听 Redis Pub/Sub 并异步迭代传入的消息。
import Redis from 'ioredis'; function redisEventIterator(channel: string) { const client = new Redis(); return createEventIterator(({ emit, cancel }) => { const messageHandler = (channel: string, message: string) => { emit(message); }; // Subscribe to the channel client.subscribe(channel); client.on('message', messageHandler); // Cleanup function to unsubscribe and disconnect return async () => { client.off('message', messageHandler); await client.unsubscribe(channel); await client.quit(); }; }); } // Usage (async () => { for await (const message of redisEventIterator('my-channel')) { console.log('New message:', message); // You can cancel the event stream if needed if (message === 'STOP') { break; } } })();
在此示例中,我们使用 createEventIterator 订阅 Redis Pub/Sub 通道并异步迭代消息。每次有新消息到达时,它都会被发送到生成器中,我们可以在那里实时处理它。如果收到特定消息(例如“STOP”),我们将中断循环并取消订阅 Redis。
以下是如何将 createEventIterator 与 Node.js 的 EventEmitter 结合使用:
import { EventEmitter } from 'events'; function eventEmitterIterator(emitter: EventEmitter, eventName: string) { return createEventIterator(({ emit, cancel }) => { const eventHandler = (data: string) => emit(data); emitter.on(eventName, eventHandler); // Cleanup function to remove the listener return () => { emitter.off(eventName, eventHandler); }; }); } // Usage (async () => { const emitter = new EventEmitter(); // Simulate event emissions setTimeout(() => emitter.emit('data', 'First event'), 1000); setTimeout(() => emitter.emit('data', 'Second event'), 2000); setTimeout(() => emitter.emit('data', 'STOP'), 3000); for await (const event of eventEmitterIterator(emitter, 'data')) { console.log('Received event:', event); if (event === 'STOP') { break; } } })();
在此示例中:
异步控制:通过利用AsyncGenerator,我们可以异步处理事件,按照自己的节奏处理它们,并在需要时暂停处理。
取消:随时取消事件流的能力使这种方法变得灵活,特别是在可能需要正常关闭连接的现实场景中。
通用:此迭代器可用于任何事件发射器或 Pub/Sub 系统,使其适用于不同的应用程序。
事件驱动架构是许多现代 Web 应用程序的基石,但当我们需要异步控制事件流时,它们的管理可能会变得棘手。借助 TypeScript 中 AsyncGenerator 的强大功能,您可以构建像此事件迭代器这样的优雅解决方案,使您的事件处理代码更干净、更易于维护。
我希望这篇文章可以帮助您开始为您自己的事件发射器使用异步迭代器。如果您有任何问题或想法,请随时在评论中分享!
免責聲明: 提供的所有資源部分來自互聯網,如果有侵犯您的版權或其他權益,請說明詳細緣由並提供版權或權益證明然後發到郵箱:[email protected] 我們會在第一時間內為您處理。
Copyright© 2022 湘ICP备2022001581号-3