"일꾼이 일을 잘하려면 먼저 도구를 갈고 닦아야 한다." - 공자, 『논어』.
첫 장 > 프로그램 작성 > 비동기 생성기를 사용하여 TypeScript에서 이벤트 이미터를 비동기식으로 반복

비동기 생성기를 사용하여 TypeScript에서 이벤트 이미터를 비동기식으로 반복

2024-11-08에 게시됨
검색:986

Asynchronously Iterating Over Event Emitters in TypeScript with Async Generators

소개

현대 웹 개발에서는 들어오는 WebSocket 메시지, 서버 전송 이벤트(SSE) 또는 Redis Pub/Sub와 같은 서비스의 데이터 스트림 처리 등 이벤트를 처리하는 경우가 많습니다. Node.js는 이벤트 기반 기능을 제공하지만 for wait...of 루프를 사용하여 이벤트를 비동기적으로 반복하는 기본 방법이 부족합니다.

이 게시물에서는 TypeScript와 AsyncGenerator를 사용하여 비동기 이벤트 반복자를 생성하는 간단하면서도 강력한 방법을 안내해 드리겠습니다. 이 접근 방식은 취소 및 정리 논리를 완벽하게 제어하면서 깔끔하고 예측 가능한 방식으로 모든 종류의 이벤트 이미터의 이벤트를 사용할 수 있도록 설계되었습니다.

사용 사례: Redis Pub/Sub

최근 프로젝트 중 하나에서는 Redis Pub/Sub 채널을 수신하고 SSE(서버 전송 이벤트)를 연결된 클라이언트에 비동기식으로 전달해야 했습니다. 문제는 소비자가 언제든지 이벤트 스트림을 취소할 수 있도록 하면서 시스템에 부담을 주지 않고 들어오는 이벤트를 처리하는 것이었습니다.

해결책은요? 이벤트 이미터(예: Redis Pub/Sub)를 비동기 반복 가능 항목으로 변환하는 이벤트 반복기입니다. 이를 통해 제어된 방식으로 이벤트를 처리하고 필요한 경우 취소를 적절하게 처리할 수 있습니다.

구현에 대해 자세히 살펴보겠습니다.

코드

export type Context = {
    emit: (value: T) => void;
    cancel: () => void;
};

export type CleanupFn = () => void | Promise;

export type Subscriber = (
    context: Context,
) => void | CleanupFn | Promise;

export async function* createEventIterator(
    subscriber: Subscriber,
): AsyncGenerator {
    const events: T[] = [];
    let cancelled = false;

    // Create a promise that resolves whenever a new event is added to the events array
    let resolveNext: (() => void) | null = null;

    const emit = (event: T) => {
        events.push(event);
        // If we are awaiting for a new event, resolve the promise
        if (resolveNext) {
            resolveNext();
            resolveNext = null;
        }
    };

    const cancel = () => {
        cancelled = true;
    };

    const unsubscribe = await subscriber({ emit, cancel });

    try {
        while (!cancelled) {
            // If there are events in the queue, yield the next event
            if (events.length > 0) {
                yield events.shift()!;
            } else {
                // Wait for the next event
                await new Promise((resolve) => {
                    resolveNext = resolve;
                });
            }
        }

        // Process any remaining events that were emitted before cancellation.
        while (events.length > 0) {
            yield events.shift()!;
        }
    } finally {
        await unsubscribe?.();
    }
}

작동 방식

이 함수는 이벤트 이미터 또는 게시/구독 시스템에 연결할 수 있는 구독자 기능을 허용합니다. 구독자는 두 가지 필수 방법을 제공합니다:

  1. 방출: 구독자가 새 이벤트를 반복자에 푸시할 수 있습니다.
  2. 취소: 반복을 중지해야 한다는 신호를 보내는 방법을 제공합니다.

이 함수는 AsyncGenerator를 반환하므로 for wait...of 루프를 사용하여 이벤트를 반복할 수 있습니다.

코드 분석

  1. 컨텍스트 객체:
    Context 유형은 새 이벤트를 내보내거나 구독을 취소하는 인터페이스를 제공합니다. 구독자는 이 컨텍스트를 사용하여 이벤트 흐름을 제어합니다.

  2. 이벤트 대기열:
    이벤트: T[] 배열은 발생한 이벤트를 저장하는 버퍼 역할을 합니다. 생성기는 이러한 이벤트를 하나씩 처리합니다. 대기열에 이벤트가 없으면 다음 이벤트가 방출될 때까지 기다립니다.

  3. 논리 방출:
    내보내기 기능은 대기열에 새 이벤트를 추가하고 보류 중인 약속을 해결합니다(즉, 생성기가 새 이벤트를 기다리는 경우).

  4. 해제:
    취소 함수가 호출되면 플래그(cancelled = true)를 설정하여 루프가 종료되어야 한다는 신호를 보냅니다. 대기열에 남아 있는 모든 이벤트는 생성기가 완료되기 전에 계속 처리됩니다.

  5. 대청소:
    취소 후 생성기는 구독 취소 기능(제공된 경우)을 호출하여 필요한 정리를 수행합니다. 이는 Redis와 같은 외부 시스템의 구독을 취소하거나 리소스를 정리할 때 특히 중요합니다.

예: Redis Pub/Sub 듣기

이 이벤트 반복자를 사용하여 Redis Pub/Sub를 수신하고 수신 메시지를 비동기적으로 반복하는 방법을 살펴보겠습니다.

import Redis from 'ioredis';

function redisEventIterator(channel: string) {
    const client = new Redis();

    return createEventIterator(({ emit, cancel }) => {
        const messageHandler = (channel: string, message: string) => {
            emit(message);
        };

        // Subscribe to the channel
        client.subscribe(channel);
        client.on('message', messageHandler);

        // Cleanup function to unsubscribe and disconnect
        return async () => {
            client.off('message', messageHandler);
            await client.unsubscribe(channel);
            await client.quit();
        };
    });
}

// Usage
(async () => {
    for await (const message of redisEventIterator('my-channel')) {
        console.log('New message:', message);

        // You can cancel the event stream if needed
        if (message === 'STOP') {
            break;
        }
    }
})();

이 예에서는 createEventIterator를 사용하여 Redis Pub/Sub 채널을 구독하고 메시지를 비동기식으로 반복합니다. 새 메시지가 도착할 때마다 생성기로 내보내져 실시간으로 처리할 수 있습니다. 특정 메시지(예: "STOP")가 수신되면 루프를 중단하고 Redis 구독을 취소합니다.

예: EventEmitter 사용

Node.js의 EventEmitter와 함께 createEventIterator를 사용하는 방법은 다음과 같습니다.

import { EventEmitter } from 'events';

function eventEmitterIterator(emitter: EventEmitter, eventName: string) {
    return createEventIterator(({ emit, cancel }) => {
        const eventHandler = (data: string) => emit(data);

        emitter.on(eventName, eventHandler);

        // Cleanup function to remove the listener
        return () => {
            emitter.off(eventName, eventHandler);
        };
    });
}

// Usage
(async () => {
    const emitter = new EventEmitter();

    // Simulate event emissions
    setTimeout(() => emitter.emit('data', 'First event'), 1000);
    setTimeout(() => emitter.emit('data', 'Second event'), 2000);
    setTimeout(() => emitter.emit('data', 'STOP'), 3000);

    for await (const event of eventEmitterIterator(emitter, 'data')) {
        console.log('Received event:', event);

        if (event === 'STOP') {
            break;
        }
    }
})();

이 예에서는:

  • EventEmitter를 사용하여 createEventIterator에 의해 캡처되는 이벤트를 내보냅니다.
  • 반복자는 '데이터' 이벤트를 수신하고 이를 비동기적으로 처리합니다.
  • Redis 예시와 유사하게 특정 이벤트('STOP')가 수신되면 반복을 중지할 수 있습니다.

이 접근 방식의 이점

  • 비동기 제어: AsyncGenerator를 활용하면 이벤트를 비동기적으로 처리하고, 원하는 속도로 처리하고, 필요할 때 처리를 일시 중지할 수 있습니다.

  • 취소: 언제든지 이벤트 스트림을 취소할 수 있으므로 이 접근 방식이 유연해지며, 특히 연결을 정상적으로 닫아야 할 수 있는 실제 시나리오에서 더욱 그렇습니다.

  • 일반 용도: 이 반복자는 모든 이벤트 이미터 또는 Pub/Sub 시스템에 사용할 수 있으므로 다양한 애플리케이션에 다용도로 사용할 수 있습니다.

결론

이벤트 중심 아키텍처는 많은 최신 웹 애플리케이션의 초석이지만 이벤트 흐름을 비동기적으로 제어해야 할 때 관리하기 까다로울 수 있습니다. TypeScript의 AsyncGenerator 기능을 사용하면 이 이벤트 반복기와 같은 우아한 솔루션을 구축하여 이벤트 처리 코드를 더 깔끔하고 유지 관리하기 쉽게 만들 수 있습니다.

이 게시물이 여러분의 이벤트 이미터에 대한 비동기 반복자를 시작하는 데 도움이 되기를 바랍니다. 질문이나 의견이 있으시면 댓글로 자유롭게 공유해주세요!

릴리스 선언문 이 기사는 https://dev.to/redjohnsh/asynchronously-iteracing-over-event-emitters-in-typescript-with-async-generators-3mk?1에서 복제됩니다. 침해가 있는 경우, Study_golang@163으로 문의하시기 바랍니다. .com에서 삭제하세요
최신 튜토리얼 더>

부인 성명: 제공된 모든 리소스는 부분적으로 인터넷에서 가져온 것입니다. 귀하의 저작권이나 기타 권리 및 이익이 침해된 경우 자세한 이유를 설명하고 저작권 또는 권리 및 이익에 대한 증거를 제공한 후 이메일([email protected])로 보내주십시오. 최대한 빨리 처리해 드리겠습니다.

Copyright© 2022 湘ICP备2022001581号-3