현대 웹 개발에서는 들어오는 WebSocket 메시지, 서버 전송 이벤트(SSE) 또는 Redis Pub/Sub와 같은 서비스의 데이터 스트림 처리 등 이벤트를 처리하는 경우가 많습니다. Node.js는 이벤트 기반 기능을 제공하지만 for wait...of 루프를 사용하여 이벤트를 비동기적으로 반복하는 기본 방법이 부족합니다.
이 게시물에서는 TypeScript와 AsyncGenerator를 사용하여 비동기 이벤트 반복자를 생성하는 간단하면서도 강력한 방법을 안내해 드리겠습니다. 이 접근 방식은 취소 및 정리 논리를 완벽하게 제어하면서 깔끔하고 예측 가능한 방식으로 모든 종류의 이벤트 이미터의 이벤트를 사용할 수 있도록 설계되었습니다.
최근 프로젝트 중 하나에서는 Redis Pub/Sub 채널을 수신하고 SSE(서버 전송 이벤트)를 연결된 클라이언트에 비동기식으로 전달해야 했습니다. 문제는 소비자가 언제든지 이벤트 스트림을 취소할 수 있도록 하면서 시스템에 부담을 주지 않고 들어오는 이벤트를 처리하는 것이었습니다.
해결책은요? 이벤트 이미터(예: Redis Pub/Sub)를 비동기 반복 가능 항목으로 변환하는 이벤트 반복기입니다. 이를 통해 제어된 방식으로 이벤트를 처리하고 필요한 경우 취소를 적절하게 처리할 수 있습니다.
구현에 대해 자세히 살펴보겠습니다.
export type Context= { emit: (value: T) => void; cancel: () => void; }; export type CleanupFn = () => void | Promise ; export type Subscriber = ( context: Context , ) => void | CleanupFn | Promise ; export async function* createEventIterator ( subscriber: Subscriber , ): AsyncGenerator { const events: T[] = []; let cancelled = false; // Create a promise that resolves whenever a new event is added to the events array let resolveNext: (() => void) | null = null; const emit = (event: T) => { events.push(event); // If we are awaiting for a new event, resolve the promise if (resolveNext) { resolveNext(); resolveNext = null; } }; const cancel = () => { cancelled = true; }; const unsubscribe = await subscriber({ emit, cancel }); try { while (!cancelled) { // If there are events in the queue, yield the next event if (events.length > 0) { yield events.shift()!; } else { // Wait for the next event await new Promise ((resolve) => { resolveNext = resolve; }); } } // Process any remaining events that were emitted before cancellation. while (events.length > 0) { yield events.shift()!; } } finally { await unsubscribe?.(); } }
이 함수는 이벤트 이미터 또는 게시/구독 시스템에 연결할 수 있는 구독자 기능을 허용합니다. 구독자는 두 가지 필수 방법을 제공합니다:
이 함수는 AsyncGenerator
컨텍스트 객체:
Context
이벤트 대기열:
이벤트: T[] 배열은 발생한 이벤트를 저장하는 버퍼 역할을 합니다. 생성기는 이러한 이벤트를 하나씩 처리합니다. 대기열에 이벤트가 없으면 다음 이벤트가 방출될 때까지 기다립니다.
논리 방출:
내보내기 기능은 대기열에 새 이벤트를 추가하고 보류 중인 약속을 해결합니다(즉, 생성기가 새 이벤트를 기다리는 경우).
해제:
취소 함수가 호출되면 플래그(cancelled = true)를 설정하여 루프가 종료되어야 한다는 신호를 보냅니다. 대기열에 남아 있는 모든 이벤트는 생성기가 완료되기 전에 계속 처리됩니다.
대청소:
취소 후 생성기는 구독 취소 기능(제공된 경우)을 호출하여 필요한 정리를 수행합니다. 이는 Redis와 같은 외부 시스템의 구독을 취소하거나 리소스를 정리할 때 특히 중요합니다.
이 이벤트 반복자를 사용하여 Redis Pub/Sub를 수신하고 수신 메시지를 비동기적으로 반복하는 방법을 살펴보겠습니다.
import Redis from 'ioredis'; function redisEventIterator(channel: string) { const client = new Redis(); return createEventIterator(({ emit, cancel }) => { const messageHandler = (channel: string, message: string) => { emit(message); }; // Subscribe to the channel client.subscribe(channel); client.on('message', messageHandler); // Cleanup function to unsubscribe and disconnect return async () => { client.off('message', messageHandler); await client.unsubscribe(channel); await client.quit(); }; }); } // Usage (async () => { for await (const message of redisEventIterator('my-channel')) { console.log('New message:', message); // You can cancel the event stream if needed if (message === 'STOP') { break; } } })();
이 예에서는 createEventIterator를 사용하여 Redis Pub/Sub 채널을 구독하고 메시지를 비동기식으로 반복합니다. 새 메시지가 도착할 때마다 생성기로 내보내져 실시간으로 처리할 수 있습니다. 특정 메시지(예: "STOP")가 수신되면 루프를 중단하고 Redis 구독을 취소합니다.
Node.js의 EventEmitter와 함께 createEventIterator를 사용하는 방법은 다음과 같습니다.
import { EventEmitter } from 'events'; function eventEmitterIterator(emitter: EventEmitter, eventName: string) { return createEventIterator(({ emit, cancel }) => { const eventHandler = (data: string) => emit(data); emitter.on(eventName, eventHandler); // Cleanup function to remove the listener return () => { emitter.off(eventName, eventHandler); }; }); } // Usage (async () => { const emitter = new EventEmitter(); // Simulate event emissions setTimeout(() => emitter.emit('data', 'First event'), 1000); setTimeout(() => emitter.emit('data', 'Second event'), 2000); setTimeout(() => emitter.emit('data', 'STOP'), 3000); for await (const event of eventEmitterIterator(emitter, 'data')) { console.log('Received event:', event); if (event === 'STOP') { break; } } })();
이 예에서는:
비동기 제어: AsyncGenerator를 활용하면 이벤트를 비동기적으로 처리하고, 원하는 속도로 처리하고, 필요할 때 처리를 일시 중지할 수 있습니다.
취소: 언제든지 이벤트 스트림을 취소할 수 있으므로 이 접근 방식이 유연해지며, 특히 연결을 정상적으로 닫아야 할 수 있는 실제 시나리오에서 더욱 그렇습니다.
일반 용도: 이 반복자는 모든 이벤트 이미터 또는 Pub/Sub 시스템에 사용할 수 있으므로 다양한 애플리케이션에 다용도로 사용할 수 있습니다.
이벤트 중심 아키텍처는 많은 최신 웹 애플리케이션의 초석이지만 이벤트 흐름을 비동기적으로 제어해야 할 때 관리하기 까다로울 수 있습니다. TypeScript의 AsyncGenerator 기능을 사용하면 이 이벤트 반복기와 같은 우아한 솔루션을 구축하여 이벤트 처리 코드를 더 깔끔하고 유지 관리하기 쉽게 만들 수 있습니다.
이 게시물이 여러분의 이벤트 이미터에 대한 비동기 반복자를 시작하는 데 도움이 되기를 바랍니다. 질문이나 의견이 있으시면 댓글로 자유롭게 공유해주세요!
부인 성명: 제공된 모든 리소스는 부분적으로 인터넷에서 가져온 것입니다. 귀하의 저작권이나 기타 권리 및 이익이 침해된 경우 자세한 이유를 설명하고 저작권 또는 권리 및 이익에 대한 증거를 제공한 후 이메일([email protected])로 보내주십시오. 최대한 빨리 처리해 드리겠습니다.
Copyright© 2022 湘ICP备2022001581号-3