Si necesita conocer los conceptos básicos de Kafka, como sus características clave, componentes y ventajas, tengo un artículo que lo cubre aquí. Revíselo y siga los pasos hasta que haya completado la instalación de Kafka usando Docker para continuar con las siguientes secciones.
Similar al ejemplo del artículo sobre cómo conectar Kafka con NodeJS, este código fuente también incluye dos partes: inicializar un productor para enviar mensajes a Kafka y usar un consumidor para suscribirse a mensajes de un tema.
Dividiré el código en partes más pequeñas para una mejor comprensión. Primero, definamos los valores de las variables.
package main import ( "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" ) var ( broker = "localhost:9092" groupId = "group-id" topic = "topic-name" )
- Aquí, el paquete github.com/confluentinc/confluent-kafka-go/kafka se utiliza para conectarse a Kafka.
- El broker es la dirección del host; si está utilizando ZooKeeper, reemplace la dirección del host en consecuencia.
- El groupId y el tema se pueden cambiar según sea necesario.
Lo siguiente es inicializar el productor.
func startProducer() { p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker}) if err != nil { panic(err) } go func() { for e := range p.Events() { switch ev := e.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { fmt.Printf("Delivery failed: %v\n", ev.TopicPartition) } else { fmt.Printf("Delivered message to %v\n", ev.TopicPartition) } } } }() for _, word := range []string{"message 1", "message 2", "message 3"} { p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(word), }, nil) } }
El código anterior se usa para enviar una serie de mensajes {"mensaje 1", "mensaje 2", "mensaje 3"} a un tema y usa un go-routine para iterar a través de eventos con for e := range p.Events() e imprimir el resultado de la entrega, ya sea un Éxito o fracaso.
Lo siguiente es crear un consumidor para suscribir al tema y recibir mensajes.
func startConsumer() { c, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": broker, "group.id": groupId, "auto.offset.reset": "earliest", }) if err != nil { panic(err) } c.Subscribe(topic, nil) for { msg, err := c.ReadMessage(-1) if err == nil { fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value)) } else { fmt.Printf("Consumer error: %v (%v)\n", err, msg) break } } c.Close() }
Finalmente, dado que este es un ejemplo simple, llame a las funciones para crear el productor y el consumidor para su uso. En un escenario del mundo real, la implementación del productor y del consumidor generalmente se realiza en dos servidores diferentes en un sistema de microservicios.
func main() { startProducer() startConsumer() }
¡Feliz codificación!
Si este contenido te resultó útil, visita el artículo original en mi blog para apoyar al autor y explorar más contenido interesante.
Algunas series que te pueden resultar interesantes:
Descargo de responsabilidad: Todos los recursos proporcionados provienen en parte de Internet. Si existe alguna infracción de sus derechos de autor u otros derechos e intereses, explique los motivos detallados y proporcione pruebas de los derechos de autor o derechos e intereses y luego envíelos al correo electrónico: [email protected]. Lo manejaremos por usted lo antes posible.
Copyright© 2022 湘ICP备2022001581号-3