Se você precisa conhecer o básico do Kafka, como seus principais recursos, componentes e vantagens, tenho um artigo que aborda isso aqui. Revise-o e siga as etapas até concluir a instalação do Kafka usando o Docker para prosseguir com as seções a seguir.
Semelhante ao exemplo do artigo sobre como conectar Kafka com NodeJS, este código-fonte também inclui duas partes: inicializar um produtor para enviar mensagens para Kafka e usar um consumidor para assinar mensagens de um tópico.
Vou dividir o código em partes menores para melhor compreensão. Primeiro, vamos definir os valores das variáveis.
package main import ( "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" ) var ( broker = "localhost:9092" groupId = "group-id" topic = "topic-name" )
- Aqui, o pacote github.com/confluentinc/confluent-kafka-go/kafka é usado para se conectar ao Kafka.
- O corretor é o endereço do host; se você estiver usando o ZooKeeper, substitua o endereço do host de acordo.
- O groupId e topic podem ser alterados conforme necessário.
A seguir, inicialize o produtor.
func startProducer() { p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker}) if err != nil { panic(err) } go func() { for e := range p.Events() { switch ev := e.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { fmt.Printf("Delivery failed: %v\n", ev.TopicPartition) } else { fmt.Printf("Delivered message to %v\n", ev.TopicPartition) } } } }() for _, word := range []string{"message 1", "message 2", "message 3"} { p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(word), }, nil) } }
O código acima é usado para enviar uma série de mensagens {"message 1", "message 2", "message 3"} para um tópico e usa um go-routine para iterar através de eventos com for e := range p.Events() e imprimir o resultado da entrega, seja um sucesso ou fracasso.
A seguir, crie um consumidor para se inscrever no tópico e receber mensagens.
func startConsumer() { c, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": broker, "group.id": groupId, "auto.offset.reset": "earliest", }) if err != nil { panic(err) } c.Subscribe(topic, nil) for { msg, err := c.ReadMessage(-1) if err == nil { fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value)) } else { fmt.Printf("Consumer error: %v (%v)\n", err, msg) break } } c.Close() }
Finalmente, como este é um exemplo simples, chame as funções para criar o produtor e o consumidor para uso. Em um cenário do mundo real, a implantação do produtor e do consumidor normalmente é feita em dois servidores diferentes em um sistema de microsserviços.
func main() { startProducer() startConsumer() }
Boa codificação!
Se você achou este conteúdo útil, visite o artigo original em meu blog para apoiar o autor e explorar conteúdo mais interessante.
Algumas séries que você pode achar interessantes:
Isenção de responsabilidade: Todos os recursos fornecidos são parcialmente provenientes da Internet. Se houver qualquer violação de seus direitos autorais ou outros direitos e interesses, explique os motivos detalhados e forneça prova de direitos autorais ou direitos e interesses e envie-a para o e-mail: [email protected]. Nós cuidaremos disso para você o mais rápido possível.
Copyright© 2022 湘ICP备2022001581号-3