」工欲善其事,必先利其器。「—孔子《論語.錄靈公》
首頁 > 程式設計 > 連接 Kafka 和 Golang

連接 Kafka 和 Golang

發佈於2024-11-04
瀏覽:116

介紹

如果您需要了解 Kafka 的基礎知識,例如它的主要功能、組件和優勢,我在這裡有一篇文章對此進行了介紹。請查看它並按照步驟操作,直到使用 Docker 完成 Kafka 安裝,然後繼續以下部分。

Connect Kafka with Golang

使用 Golang 連線到 Kafka

KafkaNodeJS連接的文章中的範例類似,該原始碼也包括兩部分:初始化一個生產者發送訊息Kafka 並使用Kafka 並使用消費者

訂閱來自

[ 的訊息&&& &&&]

package main

import (
  "fmt"
  "github.com/confluentinc/confluent-kafka-go/kafka"
)

var (
  broker  = "localhost:9092"
  groupId = "group-id"
  topic   = "topic-name"
)

包主 進口 ( “FMMT” “github.com/confluenceinc/confluence-kafka-go/kafka” ) 變數( 經紀人=“本地主機:9092” groupId = "組 ID" 主題 =“主題名稱” ) -這裡,套件github.com/confluenceinc/confluence-kafka-go/kafka

用於連接到

Kafka - 經紀商

是主機地址;如果您使用

ZooKeeper,請相應地替換主機地址。 - groupId

topic
可以根據需要更改。

func startProducer() {
  p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker})
  if err != nil {
    panic(err)
  }

  go func() {
    for e := range p.Events() {
      switch ev := e.(type) {
      case *kafka.Message:
        if ev.TopicPartition.Error != nil {
          fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
        } else {
          fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
        }
      }
    }
  }()

  for _, word := range []string{"message 1", "message 2", "message 3"} {
    p.Produce(&kafka.Message{
      TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
      Value:          []byte(word),
    }, nil)
  }
}

func startProducer() { p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": 代理}) 如果錯誤! = nil { 恐慌(錯誤) } 去函數(){ for e := 範圍 p.Events() { 開關 ev := e.(類型) { 案例*kafka.訊息: 如果 ev.TopicPartition.Error != nil { fmt.Printf("傳送失敗:%v\n", ev.TopicPartition) } 別的 { fmt.Printf("已將訊息傳送至 %v\n", ev.TopicPartition) } } } }() for _, word := range []string{"訊息 1", "訊息 2", "訊息 3"} { p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{主題: &topic, 分區: kafka.PartitionAny}, 值:[]位元組(字), },無) } } 上面的程式碼用於將訊息數組{"message 1", "message 2", "message 3"} 發送到主題並使用 go-routine

for e := range p.Events()

迭代事件並印出傳遞結果,無論是成功或失敗。 下一步是創建一個消費者訂閱主題
並接收

訊息
func startConsumer() {
  c, err := kafka.NewConsumer(&kafka.ConfigMap{
    "bootstrap.servers": broker,
    "group.id":          groupId,
    "auto.offset.reset": "earliest",
  })

  if err != nil {
    panic(err)
  }
  c.Subscribe(topic, nil)

  for {
    msg, err := c.ReadMessage(-1)
    if err == nil {
      fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
    } else {
      fmt.Printf("Consumer error: %v (%v)\n", err, msg)
      break
    }
  }

  c.Close()
}

func startConsumer() { c, err := kafka.NewConsumer(&kafka.ConfigMap{ 「bootstrap.servers」:經紀人, "group.id": 群組ID, "auto.offset.reset": "最早", }) 如果錯誤! = nil { 恐慌(錯誤) } c.訂閱(主題,nil) 為了 { 訊息,錯誤:= c.ReadMessage(-1) 如果錯誤==零{ fmt.Printf("%s 上的訊息:%s\n", msg.TopicPartition, string(msg.Value)) } 別的 { fmt.Printf("消費者錯誤: %v (%v)\n", err, msg) 休息 } } c.關閉() } 最後,由於這是一個簡單的範例,因此呼叫函數建立生產者消費者以供使用。在現實場景中,生產者消費者
的部署通常在

微服務
func main() {
  startProducer()
  startConsumer()
}

Connect Kafka with Golangfunc main() { 啟動生產者() 啟動消費者() }

編碼愉快!

如果您覺得本文內容有幫助,請訪問我部落格上的原文,支持作者,探索更多有趣的內容。

Connect Kafka with GolangConnect Kafka with Golang Connect Kafka with GolangConnect Kafka with GolangConnect Kafka with Golang


  • 您可能會感興趣的一些系列:
  • NodeJS
  •  反應
  • Docker 
Kubernetes

版本聲明 本文轉載於:https://dev.to/chauhoangminhnguyen/connect-kafka-with-golang-3h4d?1如有侵犯,請聯絡[email protected]刪除
最新教學 更多>
  • 如何有效地轉換PHP中的時區?
    如何有效地轉換PHP中的時區?
    在PHP 利用dateTime對象和functions DateTime對象及其相應的功能別名為時區轉換提供方便的方法。例如: //定義用戶的時區 date_default_timezone_set('歐洲/倫敦'); //創建DateTime對象 $ dateTime = ne...
    程式設計 發佈於2025-04-14
  • FastAPI自定義404頁面創建指南
    FastAPI自定義404頁面創建指南
    response = await call_next(request) if response.status_code == 404: return RedirectResponse("https://fastapi.tiangolo.com") else: ...
    程式設計 發佈於2025-04-14
  • 如何配置Pytesseract以使用數字輸出的單位數字識別?
    如何配置Pytesseract以使用數字輸出的單位數字識別?
    Pytesseract OCR具有單位數字識別和僅數字約束 在pytesseract的上下文中,在配置tesseract以識別單位數字和限制單個數字和限制輸出對數字可能會提出質疑。 To address this issue, we delve into the specifics of Te...
    程式設計 發佈於2025-04-14
  • 如何為PostgreSQL中的每個唯一標識符有效地檢索最後一行?
    如何為PostgreSQL中的每個唯一標識符有效地檢索最後一行?
    postgresql:為每個唯一標識符提取最後一行,在Postgresql中,您可能需要遇到與在數據庫中的每個不同標識相關的信息中提取信息的情況。考慮以下數據:[ 1 2014-02-01 kjkj 在數據集中的每個唯一ID中檢索最後一行的信息,您可以在操作員上使用Postgres的有效效率: ...
    程式設計 發佈於2025-04-14
  • 如何在Java字符串中有效替換多個子字符串?
    如何在Java字符串中有效替換多個子字符串?
    在java 中有效地替換多個substring,需要在需要替換一個字符串中的多個substring的情況下,很容易求助於重複應用字符串的刺激力量。 However, this can be inefficient for large strings or when working with nu...
    程式設計 發佈於2025-04-14
  • 我可以將加密從McRypt遷移到OpenSSL,並使用OpenSSL遷移MCRYPT加密數據?
    我可以將加密從McRypt遷移到OpenSSL,並使用OpenSSL遷移MCRYPT加密數據?
    將我的加密庫從mcrypt升級到openssl 問題:是否可以將我的加密庫從McRypt升級到OpenSSL?如果是這樣,如何? 答案:是的,可以將您的Encryption庫從McRypt升級到OpenSSL。 可以使用openssl。 附加說明: [openssl_decrypt()函數要求...
    程式設計 發佈於2025-04-14
  • Node.js在多核機器上的擴展方法
    Node.js在多核機器上的擴展方法
    在多核機器上的node.js可擴展性 node.js上的可擴展性,儘管它具有單線程 - 播放設計,但確實確實在多芯CPUS和MOURTI-CPU CPU SERVER和MORTI-CPU SERVERSRY機制上確實有效地縮放了量表。 Node.jsTo leverage multiple cor...
    程式設計 發佈於2025-04-14
  • 如何使用Java.net.urlConnection和Multipart/form-data編碼使用其他參數上傳文件?
    如何使用Java.net.urlConnection和Multipart/form-data編碼使用其他參數上傳文件?
    使用http request 上傳文件上傳到http server,同時也提交其他參數,java.net.net.urlconnection and Multipart/form-data Encoding是普遍的。 Here's a breakdown of the process:Mu...
    程式設計 發佈於2025-04-14
  • MySQL中AND與OR的運算優先級如何影響查詢結果?
    MySQL中AND與OR的運算優先級如何影響查詢結果?
    在mySQL 在MySQL中,操作員的優先級確定評估操作的順序。 MySQL文檔提供了操作員優先級別的綜合列表,列表的頂部運算符具有更高的優先級。 考慮以下查詢:根據文檔,OR和和運營商具有相同的優先級。 Therefore, this query can be interpreted in tw...
    程式設計 發佈於2025-04-14
  • 如何使用Python的請求和假用戶代理繞過網站塊?
    如何使用Python的請求和假用戶代理繞過網站塊?
    如何使用Python的請求模擬瀏覽器行為,以及偽造的用戶代理提供了一個用戶 - 代理標頭一個有效方法是提供有效的用戶式header,以提供有效的用戶 - 設置,該標題可以通過browser和Acterner Systems the equestersystermery和操作系統。通過模仿像Chro...
    程式設計 發佈於2025-04-14
  • 如何使用“ JSON”軟件包解析JSON陣列?
    如何使用“ JSON”軟件包解析JSON陣列?
    parsing JSON與JSON軟件包 QUALDALS:考慮以下go代碼:字符串 } func main(){ datajson:=`[“ 1”,“ 2”,“ 3”]`` arr:= jsontype {} 摘要:= = json.unmarshal([] byte(...
    程式設計 發佈於2025-04-14
  • 现代CPU上浮点除法与乘法:除法仍显著慢吗?
    现代CPU上浮点除法与乘法:除法仍显著慢吗?
    在編程領域中,浮動點在編程領域中,了解浮點操作之間的細微差別對於性能優化至關重要。 While many associate floating-point division as being much slower than multiplication, this article delves ...
    程式設計 發佈於2025-04-14
  • 為什麼我在Silverlight Linq查詢中獲得“無法找到查詢模式的實現”錯誤?
    為什麼我在Silverlight Linq查詢中獲得“無法找到查詢模式的實現”錯誤?
    查詢模式實現缺失:解決“無法找到”錯誤在銀光應用程序中,嘗試使用LINQ建立錯誤的數據庫連接的嘗試,無法找到以查詢模式的實現。 ”當省略LINQ名稱空間或查詢類型缺少IEnumerable 實現時,通常會發生此錯誤。 解決問題來驗證該類型的質量是至關重要的。在此特定實例中,tblpersoon可能...
    程式設計 發佈於2025-04-14
  • JavaScript日期比較:等於與大小關係
    JavaScript日期比較:等於與大小關係
    在處理JavaScript中的日期時,將日期與JavaScript進行比較:超越equalality ,以大於,小於,非Past值的比較對於各種應用程序至關重要。文本框輸入提供了一種方便的方式來收集日期,但是我們需要探索如何有效比較它們。 要比較等式,請使用date.getTime()。如下示例所...
    程式設計 發佈於2025-04-14
  • 為什麼PYTZ最初顯示出意外的時區偏移?
    為什麼PYTZ最初顯示出意外的時區偏移?
    與pytz 最初從pytz獲得特定的偏移。例如,亞洲/hong_kong最初顯示一個七個小時37分鐘的偏移: 差異源利用本地化將時區分配給日期,使用了適當的時區名稱和偏移量。但是,直接使用DateTime構造器分配時區不允許進行正確的調整。 example pytz.timezone(&#...
    程式設計 發佈於2025-04-14

免責聲明: 提供的所有資源部分來自互聯網,如果有侵犯您的版權或其他權益,請說明詳細緣由並提供版權或權益證明然後發到郵箱:[email protected] 我們會在第一時間內為您處理。

Copyright© 2022 湘ICP备2022001581号-3