"إذا أراد العامل أن يؤدي عمله بشكل جيد، فعليه أولاً أن يشحذ أدواته." - كونفوشيوس، "مختارات كونفوشيوس. لو لينجونج"
الصفحة الأمامية > برمجة > ربط كافكا مع جولانج

ربط كافكا مع جولانج

تم النشر بتاريخ 2024-11-04
تصفح:509

مقدمة

إذا كنت تريد معرفة أساسيات كافكا، مثل ميزاته الرئيسية ومكوناته ومزاياه، فلدي مقال يغطي ذلك هنا. يرجى مراجعته واتباع الخطوات حتى تنتهي من تثبيت كافكا باستخدام Docker لمتابعة الأقسام التالية.

Connect Kafka with Golang

الاتصال بكافكا مع جولانج

على غرار المثال الوارد في المقالة حول ربط كافكا مع NodeJS، يتضمن كود المصدر هذا أيضًا جزأين: تهيئة منتج لإرسال رسائل إلى كافكا واستخدام المستهلك للاشتراك في الرسائل من عنوان.

سأقوم بتقسيم الكود إلى أجزاء أصغر لفهم أفضل. أولا، دعونا نحدد القيم المتغيرة.

package main

import (
  "fmt"
  "github.com/confluentinc/confluent-kafka-go/kafka"
)

var (
  broker  = "localhost:9092"
  groupId = "group-id"
  topic   = "topic-name"
)

- هنا، يتم استخدام الحزمة github.com/confluentinc/confluent-kafka-go/kafka للاتصال بـ كافكا.

- الوسيط هو عنوان المضيف؛ إذا كنت تستخدم ZooKeeper، فاستبدل عنوان المضيف وفقًا لذلك.

- يمكن تغيير معرف المجموعة والموضوع حسب الحاجة.

الخطوة التالية هي تهيئة المنتج.


func startProducer() { ع، يخطئ := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": Broker}) إذا أخطأت!= لا شيء { الذعر (خطأ) } اذهب للوظيفة () { لـ e := range p.Events() { التبديل EV := e.(نوع) { الحالة * كافكا. الرسالة: إذا كان ev.TopicPartition.Error != لا شيء { fmt.Printf("فشل التسليم: %v\n"، ev.TopicPartition) } آخر { fmt.Printf("تم تسليم الرسالة إلى %v\n"، ev.TopicPartition) } } } }() لـ _، الكلمة:= النطاق []string{"message 1"، "message 2"، "message 3"} { p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{الموضوع: &topic، القسم: kafka.PartitionAny}، القيمة: [] بايت (كلمة)، }، لا شيء) } }
func startProducer() {
  p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker})
  if err != nil {
    panic(err)
  }

  go func() {
    for e := range p.Events() {
      switch ev := e.(type) {
      case *kafka.Message:
        if ev.TopicPartition.Error != nil {
          fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
        } else {
          fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
        }
      }
    }
  }()

  for _, word := range []string{"message 1", "message 2", "message 3"} {
    p.Produce(&kafka.Message{
      TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
      Value:          []byte(word),
    }, nil)
  }
}
يتم استخدام الكود أعلاه لإرسال مجموعة من الرسائل

{"message 1"، "message 2"، "message 3"} إلى موضوع ويستخدم ]go-routine للتكرار من خلال الأحداث باستخدام for e := range p.Events() وطباعة نتيجة التسليم، سواء كانت النجاح أو الفشل.

التالي هو إنشاء

مستهلك من أجل الاشتراك في الموضوع وتلقي الرسائل.

func startConsumer() { ج، يخطئ := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": وسيط، "group.id": معرف المجموعة، "auto.offset.reset": "الأقدم"، }) إذا أخطأت!= لا شيء { الذعر (خطأ) } ج.الاشتراك (الموضوع، لا شيء) ل { رسالة، خطأ := c.ReadMessage(-1) إذا أخطأت == لا شيء { fmt.Printf ("الرسالة على %s: %s\n"، msg.TopicPartition، string(msg.Value)) } آخر { fmt.Printf("خطأ المستهلك: %v (%v)\n"، خطأ، رسالة) استراحة } } ج.إغلاق() }
func startProducer() {
  p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker})
  if err != nil {
    panic(err)
  }

  go func() {
    for e := range p.Events() {
      switch ev := e.(type) {
      case *kafka.Message:
        if ev.TopicPartition.Error != nil {
          fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
        } else {
          fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
        }
      }
    }
  }()

  for _, word := range []string{"message 1", "message 2", "message 3"} {
    p.Produce(&kafka.Message{
      TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
      Value:          []byte(word),
    }, nil)
  }
}
أخيرًا، نظرًا لأن هذا مثال بسيط، قم باستدعاء الوظائف لإنشاء

المنتج والمستهلك للاستخدام. في سيناريو العالم الحقيقي، يتم عادةً نشر المنتج و المستهلك على خادمين مختلفين في نظام الخدمات الصغيرة.

الوظيفة الرئيسية () { بداية المنتج () بدء المستهلك () }
func startProducer() {
  p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker})
  if err != nil {
    panic(err)
  }

  go func() {
    for e := range p.Events() {
      switch ev := e.(type) {
      case *kafka.Message:
        if ev.TopicPartition.Error != nil {
          fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
        } else {
          fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
        }
      }
    }
  }()

  for _, word := range []string{"message 1", "message 2", "message 3"} {
    p.Produce(&kafka.Message{
      TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
      Value:          []byte(word),
    }, nil)
  }
}

Connect Kafka with Golang

برمجة سعيدة!


إذا وجدت هذا المحتوى مفيدًا، فيرجى زيارة المقالة الأصلية على مدونتي لدعم المؤلف واستكشاف المزيد من المحتوى المثير للاهتمام.

Connect Kafka with Golang Connect Kafka with Golang Connect Kafka with Golang Connect Kafka with Golang Connect Kafka with Golang

بعض المسلسلات التي قد تجدها مثيرة للاهتمام:

NodeJS
  •   رد فعل
  • عامل الميناء 
  • كوبرنيتس
بيان الافراج تم نشر هذه المقالة على: https://dev.to/chauhoangminhnguyen/connect-kafka-with-golang-3h4d?1 إذا كان هناك أي انتهاك، يرجى الاتصال بـ [email protected] لحذفه
أحدث البرنامج التعليمي أكثر>

تنصل: جميع الموارد المقدمة هي جزئيًا من الإنترنت. إذا كان هناك أي انتهاك لحقوق الطبع والنشر الخاصة بك أو الحقوق والمصالح الأخرى، فيرجى توضيح الأسباب التفصيلية وتقديم دليل على حقوق الطبع والنشر أو الحقوق والمصالح ثم إرسالها إلى البريد الإلكتروني: [email protected]. سوف نتعامل مع الأمر لك في أقرب وقت ممكن.

Copyright© 2022 湘ICP备2022001581号-3