عند تصميم بنية الخدمات الصغيرة للتطبيقات المستندة إلى الأحداث، يمكن أن يؤدي دمج Apache Kafka وNode.js إلى تعزيز قدرات معالجة البيانات في الوقت الفعلي بشكل كبير . في هذه المقالة، سنستكشف كيفية الاستفادة من تكامل Kafka Node.js لبناء خدمات صغيرة قوية وقابلة للتطوير تتعامل مع تدفق البيانات بكفاءة.
في بنية الخدمات الصغيرة، تحتاج الخدمات إلى التواصل مع بعضها البعض بكفاءة. يعمل Apache Kafka بمثابة منصة بث الأحداث الموزعة التي تتيح تبادل البيانات في الوقت الفعلي بين الخدمات الصغيرة. فهو يفصل الخدمات، مما يسمح لها بالعمل بشكل مستقل أثناء معالجة كميات كبيرة من البيانات.
لدمج Apache Kafka وNode.js في بيئة الخدمات الصغيرة، ستحتاج إلى إعداد Kafka كوسيط رسائل وربطه بخدمات Node.js الخاصة بك. إليك دليل خطوة بخطوة:
أولاً، تأكد من تثبيت Apache Kafka وNode.js على نظامك. يمكنك تثبيت Kafka & Node.js باتباع المقالات التالية:
لربط Node.js مع Kafka، يمكنك استخدام مكتبة kafkajs، وهي عميل Kafka مشهور لـ Node.js.
npm install kafkajs
في بنية الخدمات الصغيرة ، يكون منتج كافكا مسؤولاً عن إرسال الرسائل إلى موضوع كافكا. فيما يلي مثال بسيط لكيفية إنشاء منتج كافكا في Node.js:
const { Kafka } = require('kafkajs'); const kafka = new Kafka({ clientId: 'my-producer', brokers: ['localhost:9092'] }); const producer = kafka.producer(); const sendMessage = async () => { await producer.connect(); await producer.send({ topic: 'my-topic', messages: [ { value: 'Hello Kafka' }, ], }); await producer.disconnect(); }; sendMessage().catch(console.error);
يتم استخدام مستهلك كافكا لقراءة الرسائل من موضوع كافكا. إليك كيفية إنشاء مستهلك:
const { Kafka } = require('kafkajs'); const kafka = new Kafka({ clientId: 'my-consumer', brokers: ['localhost:9092'] }); const consumer = kafka.consumer({ groupId: 'my-group' }); const runConsumer = async () => { await consumer.connect(); await consumer.subscribe({ topic: 'my-topic', fromBeginning: true }); await consumer.run({ eachMessage: async ({ topic, partition, message }) => { console.log({ partition, offset: message.offset, value: message.value.toString(), }); }, }); }; runConsumer().catch(console.error);
لتوضيح تكامل Kafka وNode.js في بنية الخدمات الصغيرة، خذ بعين الاعتبار دراسة الحالة التالية:
لدينا خدمتين صغيرتين:
عندما تتم عملية شراء أو معاملة في خدمة الطلب، سيتم تحديث المخزون في خدمة المنتج. يسهّل كافكا هذا التواصل من خلال العمل كوسيط للرسائل.
خدمة الطلبات هي المسؤولة عن التعامل مع طلبات الشراء وإرسال الرسائل إلى خدمة المنتج لتحديث المخزون. إليك كيفية تنفيذ Order Service كمنتج كافكا:
// orderService.js const express = require('express'); const { Kafka } = require('kafkajs'); // Kafka producer configuration const kafka = new Kafka({ clientId: 'order-service', brokers: ['localhost:9092'], }); const producer = kafka.producer(); // Initialize Express app const app = express(); app.use(express.json()); const placeOrder = async (orderId, productId, quantity) => { await producer.connect(); const orderEvent = { orderId, productId, quantity, eventType: 'ORDER_PLACED', timestamp: Date.now(), }; await producer.send({ topic: 'product-updates', messages: [{ value: JSON.stringify(orderEvent) }], }); await producer.disconnect(); console.log(`Order placed: ${orderId} for product: ${productId}`); }; // API endpoint to place an order app.post('/order', async (req, res) => { const { orderId, productId, quantity } = req.body; if (!orderId || !productId || !quantity) { return res.status(400).json({ error: 'Missing orderId, productId, or quantity' }); } try { await placeOrder(orderId, productId, quantity); res.status(200).json({ message: `Order ${orderId} placed successfully.` }); } catch (error) { console.error('Error placing order:', error); res.status(500).json({ error: 'Failed to place order' }); } }); // Start the server const PORT = process.env.PORT || 3000; app.listen(PORT, () => { console.log(`Order Service API running on port ${PORT}`); });البرنامج النصي لمستهلك خدمة المنتج
الرسائل من موضوع كافكا بتحديثات المنتج وتقوم بتحديث مخزون المنتج وفقًا لذلك. وإليكم التنفيذ:
// orderService.js const express = require('express'); const { Kafka } = require('kafkajs'); // Kafka producer configuration const kafka = new Kafka({ clientId: 'order-service', brokers: ['localhost:9092'], }); const producer = kafka.producer(); // Initialize Express app const app = express(); app.use(express.json()); const placeOrder = async (orderId, productId, quantity) => { await producer.connect(); const orderEvent = { orderId, productId, quantity, eventType: 'ORDER_PLACED', timestamp: Date.now(), }; await producer.send({ topic: 'product-updates', messages: [{ value: JSON.stringify(orderEvent) }], }); await producer.disconnect(); console.log(`Order placed: ${orderId} for product: ${productId}`); }; // API endpoint to place an order app.post('/order', async (req, res) => { const { orderId, productId, quantity } = req.body; if (!orderId || !productId || !quantity) { return res.status(400).json({ error: 'Missing orderId, productId, or quantity' }); } try { await placeOrder(orderId, productId, quantity); res.status(200).json({ message: `Order ${orderId} placed successfully.` }); } catch (error) { console.error('Error placing order:', error); res.status(500).json({ error: 'Failed to place order' }); } }); // Start the server const PORT = process.env.PORT || 3000; app.listen(PORT, () => { console.log(`Order Service API running on port ${PORT}`); });ابدأ
خدمة المنتج أولاً، لأنها تحتاج إلى الاستماع للرسائل الواردة:
// orderService.js const express = require('express'); const { Kafka } = require('kafkajs'); // Kafka producer configuration const kafka = new Kafka({ clientId: 'order-service', brokers: ['localhost:9092'], }); const producer = kafka.producer(); // Initialize Express app const app = express(); app.use(express.json()); const placeOrder = async (orderId, productId, quantity) => { await producer.connect(); const orderEvent = { orderId, productId, quantity, eventType: 'ORDER_PLACED', timestamp: Date.now(), }; await producer.send({ topic: 'product-updates', messages: [{ value: JSON.stringify(orderEvent) }], }); await producer.disconnect(); console.log(`Order placed: ${orderId} for product: ${productId}`); }; // API endpoint to place an order app.post('/order', async (req, res) => { const { orderId, productId, quantity } = req.body; if (!orderId || !productId || !quantity) { return res.status(400).json({ error: 'Missing orderId, productId, or quantity' }); } try { await placeOrder(orderId, productId, quantity); res.status(200).json({ message: `Order ${orderId} placed successfully.` }); } catch (error) { console.error('Error placing order:', error); res.status(500).json({ error: 'Failed to place order' }); } }); // Start the server const PORT = process.env.PORT || 3000; app.listen(PORT, () => { console.log(`Order Service API running on port ${PORT}`); });ستبدأ خدمة المنتج
في الاستماع على المنفذ 3001 (أو منفذ آخر إذا تم تحديده). ابدأ
خدمة الطلب بهذا الأمر:
عقدة orderService.js
node orderService.jsخدمة الطلب
متاحة على المنفذ 3000 (أو منفذ آخر إذا تم تحديده). يمكنك تقديم طلب عن طريق إرسال طلب POST إلى
واجهة برمجة تطبيقات خدمة الطلب:
curl -X POST http://localhost:3000/order \
-H "نوع المحتوى: application/json" \
-د '{
"معرف الطلب": "الطلب-789"،
"معرف المنتج": "المنتج-123"،
"الكمية": 5
}'
curl -X POST http://localhost:3000/order \ -H "Content-Type: application/json" \ -d '{ "orderId": "order-789", "productId": "product-123", "quantity": 5 }'خدمة الطلب
رسالة كافكا، وستستهلك خدمة المنتج تلك الرسالة لتحديث المخزون:
الطلب المستلم: الطلب-789، المنتج: المنتج-123، الكمية: 5
تحديث المخزون للمنتج: المنتج-123
Received order: order-789, Product: product-123, Quantity: 5 Updating stock for product: product-123
في بنية الخدمات الصغيرة الخاصة بك إنشاء تطبيقات تعتمد على الأحداث قابلة للتطوير ومرنة للغاية. من خلال اتباع أفضل الممارسات والاستفادة من ميزات كافكا القوية، يمكنك معالجة
البيانات في الوقت الفعليبكفاءة وإنشاء طبقة اتصال قوية بين خدماتك الصغيرة.
تنصل: جميع الموارد المقدمة هي جزئيًا من الإنترنت. إذا كان هناك أي انتهاك لحقوق الطبع والنشر الخاصة بك أو الحقوق والمصالح الأخرى، فيرجى توضيح الأسباب التفصيلية وتقديم دليل على حقوق الطبع والنشر أو الحقوق والمصالح ثم إرسالها إلى البريد الإلكتروني: [email protected]. سوف نتعامل مع الأمر لك في أقرب وقت ممكن.
Copyright© 2022 湘ICP备2022001581号-3