Если вам нужно знать основы Kafka, такие как его ключевые особенности, компоненты и преимущества, у меня есть статья, посвященная этому здесь. Просмотрите его и следуйте инструкциям, пока не завершите установку Kafka с помощью Docker, чтобы перейти к следующим разделам.
Подобно примеру в статье о подключении Kafka к NodeJS, этот исходный код также включает две части: инициализацию продюсера для отправки сообщений в Kafka и используя потребителя для подписки на сообщения из темы.
Для лучшего понимания я разобью код на более мелкие части. Сначала давайте определим значения переменных.
package main import ( "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" ) var ( broker = "localhost:9092" groupId = "group-id" topic = "topic-name" )
— Здесь пакет github.com/confluentinc/confluent-kafka-go/kafka используется для подключения к Kafka.
— брокер — это адрес хоста; если вы используете ZooKeeper, замените адрес хоста соответствующим образом.
– groupId и тема можно изменить по мере необходимости.
Далее инициализируется продюсер.
func startProducer() { p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker}) if err != nil { panic(err) } go func() { for e := range p.Events() { switch ev := e.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { fmt.Printf("Delivery failed: %v\n", ev.TopicPartition) } else { fmt.Printf("Delivered message to %v\n", ev.TopicPartition) } } } }() for _, word := range []string{"message 1", "message 2", "message 3"} { p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(word), }, nil) } }
Приведенный выше код используется для отправки массива сообщений {"message 1", "message 2", "message 3"} в тему и использует ]go-routine для перебора событий с помощью for e := range p.Events() и распечатайте результат доставки, независимо от того, успешен он или нет.
Следующим шагом является создание потребителя для подписки на тему и получения сообщений.
func startConsumer() { c, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": broker, "group.id": groupId, "auto.offset.reset": "earliest", }) if err != nil { panic(err) } c.Subscribe(topic, nil) for { msg, err := c.ReadMessage(-1) if err == nil { fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value)) } else { fmt.Printf("Consumer error: %v (%v)\n", err, msg) break } } c.Close() }
Наконец, поскольку это простой пример, вызовите функции для создания производителя и потребителя для использования. В реальном сценарии развертывание производителя и потребителя обычно выполняется на двух разных серверах в системе микросервисов
func main() { startProducer() startConsumer() }
Удачного программирования!
Если этот контент оказался для вас полезным, посетите оригинальную статью в моем блоге, чтобы поддержать автора и изучить более интересный контент.
Некоторые сериалы, которые могут вас заинтересовать:
Отказ от ответственности: Все предоставленные ресурсы частично взяты из Интернета. В случае нарушения ваших авторских прав или других прав и интересов, пожалуйста, объясните подробные причины и предоставьте доказательства авторских прав или прав и интересов, а затем отправьте их по электронной почте: [email protected]. Мы сделаем это за вас как можно скорее.
Copyright© 2022 湘ICP备2022001581号-3