«Если рабочий хочет хорошо выполнять свою работу, он должен сначала заточить свои инструменты» — Конфуций, «Аналитики Конфуция. Лу Лингун»
титульная страница > программирование > Соедините Кафку с Голангом

Соедините Кафку с Голангом

Опубликовано 4 ноября 2024 г.
Просматривать:899

Введение

Если вам нужно знать основы Kafka, такие как его ключевые особенности, компоненты и преимущества, у меня есть статья, посвященная этому здесь. Просмотрите его и следуйте инструкциям, пока не завершите установку Kafka с помощью Docker, чтобы перейти к следующим разделам.

Connect Kafka with Golang

Подключение к Кафке с помощью Golang

Подобно примеру в статье о подключении Kafka к NodeJS, этот исходный код также включает две части: инициализацию продюсера для отправки сообщений в Kafka и используя потребителя для подписки на сообщения из темы.

Для лучшего понимания я разобью код на более мелкие части. Сначала давайте определим значения переменных.

package main

import (
  "fmt"
  "github.com/confluentinc/confluent-kafka-go/kafka"
)

var (
  broker  = "localhost:9092"
  groupId = "group-id"
  topic   = "topic-name"
)

 — Здесь пакет github.com/confluentinc/confluent-kafka-go/kafka используется для подключения к Kafka.

брокер — это адрес хоста; если вы используете ZooKeeper, замените адрес хоста соответствующим образом.

 – groupId и тема можно изменить по мере необходимости.

Далее инициализируется продюсер.

func startProducer() {
  p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker})
  if err != nil {
    panic(err)
  }

  go func() {
    for e := range p.Events() {
      switch ev := e.(type) {
      case *kafka.Message:
        if ev.TopicPartition.Error != nil {
          fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
        } else {
          fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
        }
      }
    }
  }()

  for _, word := range []string{"message 1", "message 2", "message 3"} {
    p.Produce(&kafka.Message{
      TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
      Value:          []byte(word),
    }, nil)
  }
}

Приведенный выше код используется для отправки массива сообщений {"message 1", "message 2", "message 3"} в тему и использует ]go-routine для перебора событий с помощью for e := range p.Events() и распечатайте результат доставки, независимо от того, успешен он или нет.

Следующим шагом является создание потребителя для подписки на тему и получения сообщений.

func startConsumer() {
  c, err := kafka.NewConsumer(&kafka.ConfigMap{
    "bootstrap.servers": broker,
    "group.id":          groupId,
    "auto.offset.reset": "earliest",
  })

  if err != nil {
    panic(err)
  }
  c.Subscribe(topic, nil)

  for {
    msg, err := c.ReadMessage(-1)
    if err == nil {
      fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
    } else {
      fmt.Printf("Consumer error: %v (%v)\n", err, msg)
      break
    }
  }

  c.Close()
}

Наконец, поскольку это простой пример, вызовите функции для создания производителя и потребителя для использования. В реальном сценарии развертывание производителя и потребителя обычно выполняется на двух разных серверах в системе микросервисов

func main() {
  startProducer()
  startConsumer()
}

Connect Kafka with Golang

Удачного программирования!


Если этот контент оказался для вас полезным, посетите оригинальную статью в моем блоге, чтобы поддержать автора и изучить более интересный контент.

Connect Kafka with GolangConnect Kafka with GolangConnect Kafka with GolangConnect Kafka with GolangConnect Kafka with Golang


Некоторые сериалы, которые могут вас заинтересовать:

  • NodeJS
  •  Реагировать
  • Докер 
  • Кубернетес
Заявление о выпуске Эта статья воспроизведена по адресу: https://dev.to/chauhoangminhnguyen/connect-kafka-with-golang-3h4d?1. В случае нарушения авторских прав свяжитесь с [email protected], чтобы удалить ее.
Последний учебник Более>

Изучайте китайский

Отказ от ответственности: Все предоставленные ресурсы частично взяты из Интернета. В случае нарушения ваших авторских прав или других прав и интересов, пожалуйста, объясните подробные причины и предоставьте доказательства авторских прав или прав и интересов, а затем отправьте их по электронной почте: [email protected]. Мы сделаем это за вас как можно скорее.

Copyright© 2022 湘ICP备2022001581号-3