إذا كنت تريد معرفة أساسيات كافكا، مثل ميزاته الرئيسية ومكوناته ومزاياه، فلدي مقال يغطي ذلك هنا. يرجى مراجعته واتباع الخطوات حتى تنتهي من تثبيت كافكا باستخدام Docker لمتابعة الأقسام التالية.
على غرار المثال الوارد في المقالة حول ربط كافكا مع NodeJS، يتضمن كود المصدر هذا أيضًا جزأين: تهيئة منتج لإرسال رسائل إلى كافكا واستخدام المستهلك للاشتراك في الرسائل من عنوان.
سأقوم بتقسيم الكود إلى أجزاء أصغر لفهم أفضل. أولا، دعونا نحدد القيم المتغيرة.
package main import ( "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" ) var ( broker = "localhost:9092" groupId = "group-id" topic = "topic-name" )
- هنا، يتم استخدام الحزمة github.com/confluentinc/confluent-kafka-go/kafka للاتصال بـ كافكا.
- الوسيط هو عنوان المضيف؛ إذا كنت تستخدم ZooKeeper، فاستبدل عنوان المضيف وفقًا لذلك.
- يمكن تغيير معرف المجموعة والموضوع حسب الحاجة.
الخطوة التالية هي تهيئة المنتج.
func startProducer() { p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker}) if err != nil { panic(err) } go func() { for e := range p.Events() { switch ev := e.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { fmt.Printf("Delivery failed: %v\n", ev.TopicPartition) } else { fmt.Printf("Delivered message to %v\n", ev.TopicPartition) } } } }() for _, word := range []string{"message 1", "message 2", "message 3"} { p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(word), }, nil) } }يتم استخدام الكود أعلاه لإرسال مجموعة من الرسائل
{"message 1"، "message 2"، "message 3"} إلى موضوع ويستخدم ]go-routine للتكرار من خلال الأحداث باستخدام for e := range p.Events() وطباعة نتيجة التسليم، سواء كانت النجاح أو الفشل.
التالي هو إنشاءمستهلك من أجل الاشتراك في الموضوع وتلقي الرسائل.
func startProducer() { p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker}) if err != nil { panic(err) } go func() { for e := range p.Events() { switch ev := e.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { fmt.Printf("Delivery failed: %v\n", ev.TopicPartition) } else { fmt.Printf("Delivered message to %v\n", ev.TopicPartition) } } } }() for _, word := range []string{"message 1", "message 2", "message 3"} { p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(word), }, nil) } }أخيرًا، نظرًا لأن هذا مثال بسيط، قم باستدعاء الوظائف لإنشاء
المنتج والمستهلك للاستخدام. في سيناريو العالم الحقيقي، يتم عادةً نشر المنتج و المستهلك على خادمين مختلفين في نظام الخدمات الصغيرة.
func startProducer() { p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker}) if err != nil { panic(err) } go func() { for e := range p.Events() { switch ev := e.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { fmt.Printf("Delivery failed: %v\n", ev.TopicPartition) } else { fmt.Printf("Delivered message to %v\n", ev.TopicPartition) } } } }() for _, word := range []string{"message 1", "message 2", "message 3"} { p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(word), }, nil) } }
برمجة سعيدة!
إذا وجدت هذا المحتوى مفيدًا، فيرجى زيارة المقالة الأصلية على مدونتي لدعم المؤلف واستكشاف المزيد من المحتوى المثير للاهتمام.
NodeJS
تنصل: جميع الموارد المقدمة هي جزئيًا من الإنترنت. إذا كان هناك أي انتهاك لحقوق الطبع والنشر الخاصة بك أو الحقوق والمصالح الأخرى، فيرجى توضيح الأسباب التفصيلية وتقديم دليل على حقوق الطبع والنشر أو الحقوق والمصالح ثم إرسالها إلى البريد الإلكتروني: [email protected]. سوف نتعامل مع الأمر لك في أقرب وقت ممكن.
Copyright© 2022 湘ICP备2022001581号-3