如果您需要了解 Kafka 的基础知识,例如它的主要功能、组件和优势,我在这里有一篇文章对此进行了介绍。请查看它并按照步骤操作,直到使用 Docker 完成 Kafka 安装,然后继续以下部分。
与Kafka与NodeJS连接的文章中的示例类似,该源码也包括两部分:初始化一个生产者 发送 消息 到 Kafka 并使用 消费者 订阅来自 [ 的消息&&&]话题。
我将把代码分解成更小的部分以便更好地理解。首先,让我们定义变量值。
package main import ( "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" ) var ( broker = "localhost:9092" groupId = "group-id" topic = "topic-name" )-这里,包
github.com/confluenceinc/confluence-kafka-go/kafka用于连接到Kafka。
-经纪商是主机地址;如果您使用ZooKeeper,请相应地替换主机地址。
-groupId和topic可以根据需要更改。
接下来是初始化生产者。
package main import ( "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" ) var ( broker = "localhost:9092" groupId = "group-id" topic = "topic-name" )上面的代码用于将消息数组
{"message 1", "message 2", "message 3"} 发送到主题并使用 go-routine 用 for e := range p.Events() 迭代事件并打印出传递结果,无论是成功或失败。
下一步是创建一个消费者来订阅到主题并接收消息。
package main import ( "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" ) var ( broker = "localhost:9092" groupId = "group-id" topic = "topic-name" )最后,由于这是一个简单的示例,因此调用函数创建
生产者和消费者以供使用。在现实场景中,生产者和消费者的部署通常在微服务系统中的两个不同服务器上完成。
package main import ( "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" ) var ( broker = "localhost:9092" groupId = "group-id" topic = "topic-name" )
编码愉快!
如果您觉得本文内容有帮助,请访问我博客上的原文,支持作者,探索更多有趣的内容。
免责声明: 提供的所有资源部分来自互联网,如果有侵犯您的版权或其他权益,请说明详细缘由并提供版权或权益证明然后发到邮箱:[email protected] 我们会第一时间内为您处理。
Copyright© 2022 湘ICP备2022001581号-3