"إذا أراد العامل أن يؤدي عمله بشكل جيد، فعليه أولاً أن يشحذ أدواته." - كونفوشيوس، "مختارات كونفوشيوس. لو لينجونج"
الصفحة الأمامية > برمجة > تنفيذ Kafka وNode.js في هندسة الخدمات الصغيرة

تنفيذ Kafka وNode.js في هندسة الخدمات الصغيرة

تم النشر بتاريخ 2024-08-26
تصفح:326

Implement Kafka and Node.js in Microservice Architecture

عند تصميم بنية الخدمات الصغيرة للتطبيقات المستندة إلى الأحداث، يمكن أن يؤدي دمج Apache Kafka وNode.js إلى تعزيز قدرات معالجة البيانات في الوقت الفعلي بشكل كبير . في هذه المقالة، سنستكشف كيفية الاستفادة من تكامل Kafka Node.js لبناء خدمات صغيرة قوية وقابلة للتطوير تتعامل مع تدفق البيانات بكفاءة.

لماذا نستخدم Apache Kafka في بنية الخدمات المصغرة؟

في بنية الخدمات الصغيرة، تحتاج الخدمات إلى التواصل مع بعضها البعض بكفاءة. يعمل Apache Kafka بمثابة منصة بث الأحداث الموزعة التي تتيح تبادل البيانات في الوقت الفعلي بين الخدمات الصغيرة. فهو يفصل الخدمات، مما يسمح لها بالعمل بشكل مستقل أثناء معالجة كميات كبيرة من البيانات.

فوائد كافكا في التطبيقات المبنية على الأحداث

  • قابلية التوسع: تدعم بنية كافكا الموزعة القياس الأفقي، مما يجعلها مثالية لمعالجة البيانات في الوقت الفعلي في التطبيقات المستندة إلى الأحداث.
  • التسامح مع الأخطاء: يضمن كافكا تسليم البيانات بشكل موثوق، حتى في حالة الفشل.
  • إنتاجية عالية: يستطيع كافكا التعامل مع ملايين الأحداث في الثانية، مما يوفر إنتاجية عالية لتطبيقات الخدمات الصغيرة المطلوبة.

إعداد تكامل Kafka Node.js

لدمج Apache Kafka وNode.js في بيئة الخدمات الصغيرة، ستحتاج إلى إعداد Kafka كوسيط رسائل وربطه بخدمات Node.js الخاصة بك. إليك دليل خطوة بخطوة:

قم بتثبيت كافكا وNode.js

أولاً، تأكد من تثبيت Apache Kafka وNode.js على نظامك. يمكنك تثبيت Kafka & Node.js باتباع المقالات التالية:

  • مقدمة إلى Node.js
  • البدء مع Apache Kafka
  • كيفية دمج Apache Kafka مع Node.js

تثبيت مكتبة عملاء Kafka Node.js

لربط Node.js مع Kafka، يمكنك استخدام مكتبة kafkajs، وهي عميل Kafka مشهور لـ Node.js.

npm install kafkajs

قم بإنشاء منتج كافكا في Node.js

في بنية الخدمات الصغيرة ، يكون منتج كافكا مسؤولاً عن إرسال الرسائل إلى موضوع كافكا. فيما يلي مثال بسيط لكيفية إنشاء منتج كافكا في Node.js:

const { Kafka } = require('kafkajs');

const kafka = new Kafka({
  clientId: 'my-producer',
  brokers: ['localhost:9092']
});

const producer = kafka.producer();

const sendMessage = async () => {
  await producer.connect();
  await producer.send({
    topic: 'my-topic',
    messages: [
      { value: 'Hello Kafka' },
    ],
  });
  await producer.disconnect();
};

sendMessage().catch(console.error);

قم بإنشاء مستهلك كافكا في Node.js

يتم استخدام مستهلك كافكا لقراءة الرسائل من موضوع كافكا. إليك كيفية إنشاء مستهلك:

const { Kafka } = require('kafkajs');

const kafka = new Kafka({
  clientId: 'my-consumer',
  brokers: ['localhost:9092']
});

const consumer = kafka.consumer({ groupId: 'my-group' });

const runConsumer = async () => {
  await consumer.connect();
  await consumer.subscribe({ topic: 'my-topic', fromBeginning: true });

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log({
        partition,
        offset: message.offset,
        value: message.value.toString(),
      });
    },
  });
};

runConsumer().catch(console.error);

دراسة الحالة

لتوضيح تكامل Kafka وNode.js في بنية الخدمات الصغيرة، خذ بعين الاعتبار دراسة الحالة التالية:

سيناريو

لدينا خدمتين صغيرتين:

  1. خدمة الطلب: تتعامل مع طلبات العملاء.
  2. خدمة المنتج: إدارة مخزون المنتج.

عندما تتم عملية شراء أو معاملة في خدمة الطلب، سيتم تحديث المخزون في خدمة المنتج. يسهّل كافكا هذا التواصل من خلال العمل كوسيط للرسائل.

تطبيق

  1. خدمة الطلب: تنشر أحداث الطلب لموضوع تحديثات المنتج.
  2. خدمة المخزون: تستهلك الرسائل من موضوع تحديثات المنتج وتقوم بتحديث المخزون وفقًا لذلك.

طلب البرنامج النصي لمنتج الخدمة

خدمة الطلبات هي المسؤولة عن التعامل مع طلبات الشراء وإرسال الرسائل إلى خدمة المنتج لتحديث المخزون. إليك كيفية تنفيذ Order Service كمنتج كافكا:

// orderService.js const Express = require('express'); const {كافكا } = require('kafkajs'); // تكوين منتج كافكا كونست كافكا = كافكا الجديد({ معرف العميل: "خدمة الطلب"، الوسطاء: ['المضيف المحلي:9092']، }); منتج ثابت = kafka.producer(); // تهيئة التطبيق السريع تطبيق const = Express(); app.use(express.json()); const placeOrder = غير متزامن (معرف الطلب، معرف المنتج، الكمية) => { انتظر Producer.connect(); حدث أمر ثابت = { معرف الطلب, معرف المنتج, كمية، نوع الحدث: "ORDER_PLACED"، الطابع الزمني: Date.now(), }; في انتظار المنتج.إرسال({ الموضوع: "تحديثات المنتج"، الرسائل: [{ القيمة: JSON.stringify(orderEvent) }], }); انتظر المنتج. قطع الاتصال ()؛ console.log("تم تقديم الطلب: ${orderId} للمنتج: ${productId}`); }; // نقطة نهاية API لتقديم الطلب app.post('/order', async (req, res) => { const { معرف الطلب، معرف المنتج، الكمية } = req.body؛ إذا (!معرف الطلب || !معرف المنتج || !الكمية) { return res.status(400).json({ error: 'معرف الطلب أو معرف المنتج أو الكمية مفقود' }); } يحاول { في انتظار placeOrder(orderId, ProductId,quantity); res.status(200).json({ message: تم وضع الطلب ${orderId} بنجاح.` }); } التقاط (خطأ) { console.error('خطأ في تقديم الطلب:', خطأ); res.status(500).json({ خطأ: "فشل تقديم الطلب" }); } }); // ابدأ الخادم منفذ ثابت = معالجة.env.PORT || 3000؛ app.listen(PORT, () => { console.log("واجهة برمجة تطبيقات خدمة الطلب التي تعمل على المنفذ ${PORT}`); });
// orderService.js
const express = require('express');
const { Kafka } = require('kafkajs');

// Kafka producer configuration
const kafka = new Kafka({
  clientId: 'order-service',
  brokers: ['localhost:9092'],
});

const producer = kafka.producer();

// Initialize Express app
const app = express();
app.use(express.json());

const placeOrder = async (orderId, productId, quantity) => {
  await producer.connect();
  const orderEvent = {
    orderId,
    productId,
    quantity,
    eventType: 'ORDER_PLACED',
    timestamp: Date.now(),
  };
  await producer.send({
    topic: 'product-updates',
    messages: [{ value: JSON.stringify(orderEvent) }],
  });
  await producer.disconnect();
  console.log(`Order placed: ${orderId} for product: ${productId}`);
};

// API endpoint to place an order
app.post('/order', async (req, res) => {
  const { orderId, productId, quantity } = req.body;

  if (!orderId || !productId || !quantity) {
    return res.status(400).json({ error: 'Missing orderId, productId, or quantity' });
  }

  try {
    await placeOrder(orderId, productId, quantity);
    res.status(200).json({ message: `Order ${orderId} placed successfully.` });
  } catch (error) {
    console.error('Error placing order:', error);
    res.status(500).json({ error: 'Failed to place order' });
  }
});

// Start the server
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
  console.log(`Order Service API running on port ${PORT}`);
});
البرنامج النصي لمستهلك خدمة المنتج

تستهلك خدمة المنتج

الرسائل من موضوع كافكا بتحديثات المنتج وتقوم بتحديث مخزون المنتج وفقًا لذلك. وإليكم التنفيذ:

// ProductService.js const Express = require('express'); const {كافكا } = require('kafkajs'); // تكوين المستهلك كافكا كونست كافكا = كافكا الجديد({ معرف العميل: "خدمة المنتج"، الوسطاء: ['المضيف المحلي:9092']، }); const Consumer = kafka.consumer({ معرف المجموعة: 'مجموعة المنتجات' }); // تهيئة التطبيق السريع تطبيق const = Express(); app.use(express.json()); const updateStock = async () => { انتظر Consumer.connect(); في انتظار المستهلك. الاشتراك ({ الموضوع: 'تحديثات المنتج'، من البداية: صحيح })؛ في انتظار Consumer.run({ everyMessage: غير متزامن ({ موضوع، قسم، رسالة }) => { const orderEvent = JSON.parse(message.value.toString()); console.log("الطلب المستلم: ${orderEvent.orderId}, المنتج: ${orderEvent.productId}, الكمية: ${orderEvent.quantity}`); // محاكاة تحديث المخزون console.log("تحديث مخزون المنتج: ${orderEvent.productId}`); // منطق تحديث المخزون }, }); }; // ابدأ خدمة المنتج للاستماع إلى الرسائل updateStock().catch(console.error); // ابدأ الخادم منفذ ثابت = معالجة.env.PORT || 3001؛ app.listen(PORT, () => { console.log(`واجهة برمجة تطبيقات خدمة المنتج التي تعمل على المنفذ ${PORT}`); });
// orderService.js
const express = require('express');
const { Kafka } = require('kafkajs');

// Kafka producer configuration
const kafka = new Kafka({
  clientId: 'order-service',
  brokers: ['localhost:9092'],
});

const producer = kafka.producer();

// Initialize Express app
const app = express();
app.use(express.json());

const placeOrder = async (orderId, productId, quantity) => {
  await producer.connect();
  const orderEvent = {
    orderId,
    productId,
    quantity,
    eventType: 'ORDER_PLACED',
    timestamp: Date.now(),
  };
  await producer.send({
    topic: 'product-updates',
    messages: [{ value: JSON.stringify(orderEvent) }],
  });
  await producer.disconnect();
  console.log(`Order placed: ${orderId} for product: ${productId}`);
};

// API endpoint to place an order
app.post('/order', async (req, res) => {
  const { orderId, productId, quantity } = req.body;

  if (!orderId || !productId || !quantity) {
    return res.status(400).json({ error: 'Missing orderId, productId, or quantity' });
  }

  try {
    await placeOrder(orderId, productId, quantity);
    res.status(200).json({ message: `Order ${orderId} placed successfully.` });
  } catch (error) {
    console.error('Error placing order:', error);
    res.status(500).json({ error: 'Failed to place order' });
  }
});

// Start the server
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
  console.log(`Order Service API running on port ${PORT}`);
});
ابدأ

خدمة المنتج أولاً، لأنها تحتاج إلى الاستماع للرسائل الواردة:

عقدة ProductService.js
// orderService.js
const express = require('express');
const { Kafka } = require('kafkajs');

// Kafka producer configuration
const kafka = new Kafka({
  clientId: 'order-service',
  brokers: ['localhost:9092'],
});

const producer = kafka.producer();

// Initialize Express app
const app = express();
app.use(express.json());

const placeOrder = async (orderId, productId, quantity) => {
  await producer.connect();
  const orderEvent = {
    orderId,
    productId,
    quantity,
    eventType: 'ORDER_PLACED',
    timestamp: Date.now(),
  };
  await producer.send({
    topic: 'product-updates',
    messages: [{ value: JSON.stringify(orderEvent) }],
  });
  await producer.disconnect();
  console.log(`Order placed: ${orderId} for product: ${productId}`);
};

// API endpoint to place an order
app.post('/order', async (req, res) => {
  const { orderId, productId, quantity } = req.body;

  if (!orderId || !productId || !quantity) {
    return res.status(400).json({ error: 'Missing orderId, productId, or quantity' });
  }

  try {
    await placeOrder(orderId, productId, quantity);
    res.status(200).json({ message: `Order ${orderId} placed successfully.` });
  } catch (error) {
    console.error('Error placing order:', error);
    res.status(500).json({ error: 'Failed to place order' });
  }
});

// Start the server
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
  console.log(`Order Service API running on port ${PORT}`);
});
ستبدأ خدمة المنتج

في الاستماع على المنفذ 3001 (أو منفذ آخر إذا تم تحديده). ابدأ

خدمة الطلب

بهذا الأمر:
عقدة orderService.js

node orderService.js
خدمة الطلب

متاحة على المنفذ 3000 (أو منفذ آخر إذا تم تحديده). يمكنك تقديم طلب عن طريق إرسال طلب POST إلى

واجهة برمجة تطبيقات خدمة الطلب

:
curl -X POST http://localhost:3000/order \ -H "نوع المحتوى: application/json" \ -د '{ "معرف الطلب": "الطلب-789"، "معرف المنتج": "المنتج-123"، "الكمية": 5 }'

curl -X POST http://localhost:3000/order \
-H "Content-Type: application/json" \
-d '{
  "orderId": "order-789",
  "productId": "product-123",
  "quantity": 5
}'
خدمة الطلب

رسالة كافكا، وستستهلك خدمة المنتج تلك الرسالة لتحديث المخزون:
الطلب المستلم: الطلب-789، المنتج: المنتج-123، الكمية: 5 تحديث المخزون للمنتج: المنتج-123

Received order: order-789, Product: product-123, Quantity: 5
Updating stock for product: product-123

يتيح لك دمج

Apache Kafka وNode.js

في بنية الخدمات الصغيرة الخاصة بك إنشاء تطبيقات تعتمد على الأحداث قابلة للتطوير ومرنة للغاية. من خلال اتباع أفضل الممارسات والاستفادة من ميزات كافكا القوية، يمكنك معالجة

البيانات في الوقت الفعلي

بكفاءة وإنشاء طبقة اتصال قوية بين خدماتك الصغيرة.

بيان الافراج تم نشر هذه المقالة على: https://dev.to/codeoun/implement-kafka-and-nodejs-in-microservice-architecture-5h0h?1 إذا كان هناك أي انتهاك، يرجى الاتصال بـ [email protected] لحذفه
أحدث البرنامج التعليمي أكثر>

تنصل: جميع الموارد المقدمة هي جزئيًا من الإنترنت. إذا كان هناك أي انتهاك لحقوق الطبع والنشر الخاصة بك أو الحقوق والمصالح الأخرى، فيرجى توضيح الأسباب التفصيلية وتقديم دليل على حقوق الطبع والنشر أو الحقوق والمصالح ثم إرسالها إلى البريد الإلكتروني: [email protected]. سوف نتعامل مع الأمر لك في أقرب وقت ممكن.

Copyright© 2022 湘ICP备2022001581号-3