이벤트 기반 애플리케이션을 위한 마이크로서비스 아키텍처를 설계할 때 Apache Kafka와 Node.js를 통합하면 실시간 데이터 처리 기능이 크게 향상될 수 있습니다. . 이 문서에서는 Kafka Node.js 통합을 활용하여 스트리밍 데이터를 효율적으로 처리하는 강력하고 확장 가능한 마이크로서비스를 구축하는 방법을 살펴보겠습니다.
마이크로서비스 아키텍처에서 서비스는 서로 효율적으로 통신해야 합니다. Apache Kafka는 마이크로서비스 간 실시간 데이터 교환을 가능하게 하는 분산 이벤트 스트리밍 플랫폼 역할을 합니다. 서비스를 분리하여 대용량 데이터를 처리하면서 독립적으로 작동할 수 있도록 합니다.
마이크로서비스 환경에서 Apache Kafka와 Node.js를 통합하려면 Kafka를 메시지 브로커로 설정하고 이를 Node.js 서비스와 연결해야 합니다. 단계별 가이드는 다음과 같습니다.
먼저, Apache Kafka 및 Node.js가 시스템에 설치되어 있는지 확인하세요. 다음 문서에 따라 Kafka 및 Node.js를 설치할 수 있습니다.
Node.js를 Kafka와 연결하려면 Node.js용 인기 Kafka 클라이언트인 kafkajs 라이브러리를 사용할 수 있습니다.
npm install kafkajs
마이크로서비스 아키텍처에서 Kafka 생산자는 Kafka 주제에 메시지를 보내는 일을 담당합니다. 다음은 Node.js에서 Kafka 생산자를 생성하는 방법에 대한 간단한 예입니다.
const { Kafka } = require('kafkajs'); const kafka = new Kafka({ clientId: 'my-producer', brokers: ['localhost:9092'] }); const producer = kafka.producer(); const sendMessage = async () => { await producer.connect(); await producer.send({ topic: 'my-topic', messages: [ { value: 'Hello Kafka' }, ], }); await producer.disconnect(); }; sendMessage().catch(console.error);
Kafka 소비자는 Kafka 주제에서 메시지를 읽는 데 사용됩니다. 소비자를 생성하는 방법은 다음과 같습니다.
const { Kafka } = require('kafkajs'); const kafka = new Kafka({ clientId: 'my-consumer', brokers: ['localhost:9092'] }); const consumer = kafka.consumer({ groupId: 'my-group' }); const runConsumer = async () => { await consumer.connect(); await consumer.subscribe({ topic: 'my-topic', fromBeginning: true }); await consumer.run({ eachMessage: async ({ topic, partition, message }) => { console.log({ partition, offset: message.offset, value: message.value.toString(), }); }, }); }; runConsumer().catch(console.error);
마이크로서비스 아키텍처에서 Kafka와 Node.js의 통합을 설명하려면 다음 사례 연구를 고려하세요.
두 가지 마이크로서비스가 있습니다:
주문 서비스에서 구매 또는 거래가 발생할 때마다 제품 서비스의 재고가 업데이트됩니다. Kafka는 메시지 브로커 역할을 하여 이러한 통신을 용이하게 합니다.
주문 서비스는 구매 주문을 처리하고 재고 업데이트를 위해 제품 서비스에 메시지를 보내는 일을 담당합니다. Kafka 생산자로서 주문 서비스를 구현하는 방법은 다음과 같습니다.
// orderService.js const express = require('express'); const { Kafka } = require('kafkajs'); // Kafka producer configuration const kafka = new Kafka({ clientId: 'order-service', brokers: ['localhost:9092'], }); const producer = kafka.producer(); // Initialize Express app const app = express(); app.use(express.json()); const placeOrder = async (orderId, productId, quantity) => { await producer.connect(); const orderEvent = { orderId, productId, quantity, eventType: 'ORDER_PLACED', timestamp: Date.now(), }; await producer.send({ topic: 'product-updates', messages: [{ value: JSON.stringify(orderEvent) }], }); await producer.disconnect(); console.log(`Order placed: ${orderId} for product: ${productId}`); }; // API endpoint to place an order app.post('/order', async (req, res) => { const { orderId, productId, quantity } = req.body; if (!orderId || !productId || !quantity) { return res.status(400).json({ error: 'Missing orderId, productId, or quantity' }); } try { await placeOrder(orderId, productId, quantity); res.status(200).json({ message: `Order ${orderId} placed successfully.` }); } catch (error) { console.error('Error placing order:', error); res.status(500).json({ error: 'Failed to place order' }); } }); // Start the server const PORT = process.env.PORT || 3000; app.listen(PORT, () => { console.log(`Order Service API running on port ${PORT}`); });
제품 서비스는 product-updates Kafka 주제의 메시지를 사용하고 이에 따라 제품 재고를 업데이트합니다. 구현은 다음과 같습니다.
// productService.js const express = require('express'); const { Kafka } = require('kafkajs'); // Kafka consumer configuration const kafka = new Kafka({ clientId: 'product-service', brokers: ['localhost:9092'], }); const consumer = kafka.consumer({ groupId: 'product-group' }); // Initialize Express app const app = express(); app.use(express.json()); const updateStock = async () => { await consumer.connect(); await consumer.subscribe({ topic: 'product-updates', fromBeginning: true }); await consumer.run({ eachMessage: async ({ topic, partition, message }) => { const orderEvent = JSON.parse(message.value.toString()); console.log(`Received order: ${orderEvent.orderId}, Product: ${orderEvent.productId}, Quantity: ${orderEvent.quantity}`); // Simulate stock update console.log(`Updating stock for product: ${orderEvent.productId}`); // logic to update stock }, }); }; // Start the Product Service to listen for messages updateStock().catch(console.error); // Start the server const PORT = process.env.PORT || 3001; app.listen(PORT, () => { console.log(`Product Service API running on port ${PORT}`); });
수신 메시지를 수신해야 하므로 제품 서비스를 먼저 시작하세요.
node productService.js
제품 서비스는 포트 3001(또는 지정된 경우 다른 포트)에서 수신 대기를 시작합니다.
다음 명령으로 주문 서비스를 시작하세요.
node orderService.js
주문 서비스는 포트 3000(또는 지정된 경우 다른 포트)에서 사용할 수 있습니다.
주문 서비스 API에 POST 요청을 보내 주문할 수 있습니다.
curl -X POST http://localhost:3000/order \ -H "Content-Type: application/json" \ -d '{ "orderId": "order-789", "productId": "product-123", "quantity": 5 }'
주문이 접수되면 주문 서비스가 Kafka 메시지를 보내고 제품 서비스는 해당 메시지를 사용하여 재고를 업데이트합니다.
Received order: order-789, Product: product-123, Quantity: 5 Updating stock for product: product-123
Apache Kafka와 Node.js를 마이크로서비스 아키텍처에 통합하면 확장성과 탄력성이 뛰어난 이벤트 기반 애플리케이션을 구축할 수 있습니다.
모범 사례를 따르고 Kafka의 강력한 기능을 활용하면 실시간 데이터를 효율적으로 처리하고 마이크로서비스 간에 강력한 통신 계층을 생성할 수 있습니다.
부인 성명: 제공된 모든 리소스는 부분적으로 인터넷에서 가져온 것입니다. 귀하의 저작권이나 기타 권리 및 이익이 침해된 경우 자세한 이유를 설명하고 저작권 또는 권리 및 이익에 대한 증거를 제공한 후 이메일([email protected])로 보내주십시오. 최대한 빨리 처리해 드리겠습니다.
Copyright© 2022 湘ICP备2022001581号-3