如果您需要了解 Kafka 的基礎知識,例如它的主要功能、組件和優勢,我在這裡有一篇文章對此進行了介紹。請查看它並按照步驟操作,直到使用 Docker 完成 Kafka 安裝,然後繼續以下部分。
與Kafka與NodeJS連接的文章中的範例類似,該原始碼也包括兩部分:初始化一個生產者發送訊息 到Kafka 並使用 到Kafka 並使用消費者
訂閱來自[ 的訊息&&& &&&]
。
package main import ( "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" ) var ( broker = "localhost:9092" groupId = "group-id" topic = "topic-name" )
包主 進口 ( “FMMT” “github.com/confluenceinc/confluence-kafka-go/kafka” ) 變數( 經紀人=“本地主機:9092” groupId = "組 ID" 主題 =“主題名稱” ) -這裡,套件github.com/confluenceinc/confluence-kafka-go/kafka
用於連接到Kafka。 - 經紀商
是主機地址;如果您使用ZooKeeper,請相應地替換主機地址。 - groupId
和topic
可以根據需要更改。
func startProducer() { p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker}) if err != nil { panic(err) } go func() { for e := range p.Events() { switch ev := e.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { fmt.Printf("Delivery failed: %v\n", ev.TopicPartition) } else { fmt.Printf("Delivered message to %v\n", ev.TopicPartition) } } } }() for _, word := range []string{"message 1", "message 2", "message 3"} { p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(word), }, nil) } }
func startProducer() { p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": 代理}) 如果錯誤! = nil { 恐慌(錯誤) } 去函數(){ for e := 範圍 p.Events() { 開關 ev := e.(類型) { 案例*kafka.訊息: 如果 ev.TopicPartition.Error != nil { fmt.Printf("傳送失敗:%v\n", ev.TopicPartition) } 別的 { fmt.Printf("已將訊息傳送至 %v\n", ev.TopicPartition) } } } }() for _, word := range []string{"訊息 1", "訊息 2", "訊息 3"} { p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{主題: &topic, 分區: kafka.PartitionAny}, 值:[]位元組(字), },無) } } 上面的程式碼用於將訊息數組{"message 1", "message 2", "message 3"} 發送到主題並使用 go-routine 用
for e := range p.Events() 迭代事件並印出傳遞結果,無論是成功或失敗。
下一步是創建一個消費者來訂閱到主題
並接收
func startConsumer() { c, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": broker, "group.id": groupId, "auto.offset.reset": "earliest", }) if err != nil { panic(err) } c.Subscribe(topic, nil) for { msg, err := c.ReadMessage(-1) if err == nil { fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value)) } else { fmt.Printf("Consumer error: %v (%v)\n", err, msg) break } } c.Close() }
func startConsumer() {
c, err := kafka.NewConsumer(&kafka.ConfigMap{
「bootstrap.servers」:經紀人,
"group.id": 群組ID,
"auto.offset.reset": "最早",
})
如果錯誤! = nil {
恐慌(錯誤)
}
c.訂閱(主題,nil)
為了 {
訊息,錯誤:= c.ReadMessage(-1)
如果錯誤==零{
fmt.Printf("%s 上的訊息:%s\n", msg.TopicPartition, string(msg.Value))
} 別的 {
fmt.Printf("消費者錯誤: %v (%v)\n", err, msg)
休息
}
}
c.關閉()
}
最後,由於這是一個簡單的範例,因此呼叫函數建立生產者和消費者以供使用。在現實場景中,生產者和消費者
的部署通常在
func main() { startProducer() startConsumer() }
func main() {
啟動生產者()
啟動消費者()
}
編碼愉快!
如果您覺得本文內容有幫助,請訪問我部落格上的原文,支持作者,探索更多有趣的內容。
免責聲明: 提供的所有資源部分來自互聯網,如果有侵犯您的版權或其他權益,請說明詳細緣由並提供版權或權益證明然後發到郵箱:[email protected] 我們會在第一時間內為您處理。
Copyright© 2022 湘ICP备2022001581号-3