「労働者が自分の仕事をうまくやりたいなら、まず自分の道具を研ぎ澄まさなければなりません。」 - 孔子、「論語。陸霊公」
表紙 > プログラミング > Kafka と Golang を接続する

Kafka と Golang を接続する

2024 年 11 月 4 日に公開
ブラウズ:346

導入

主要な機能、コンポーネント、利点など、Kafka の基本を知る必要がある場合は、ここにそれを説明した記事があります。 Docker を使用して Kafka のインストールを完了するまで確認し、次のセクションに進んでください。

Connect Kafka with Golang

Golang を使用して Kafka に接続する

KafkaNodeJS の接続に関する記事の例と同様に、このソース コードにも 2 つの部分が含まれています: プロデューサー の初期化] を使用して メッセージKafka に送信し、コンシューマー を使用して [ からのメッセージを購読します。 &&&]トピック

理解を助けるために、コードを小さな部分に分割します。まず、変数の値を定義しましょう。


パッケージメイン 輸入 ( 「fmt」 「github.com/confluentinc/confluent-kafka-go/kafka」 ) var ( ブローカー = "ローカルホスト:9092" groupId = "グループID" topic = "トピック名" )
package main

import (
  "fmt"
  "github.com/confluentinc/confluent-kafka-go/kafka"
)

var (
  broker  = "localhost:9092"
  groupId = "group-id"
  topic   = "topic-name"
)
- ここでは、パッケージ

github.com/confluentinc/confluent-kafka-go/kafkaKafka に接続するために使用されます。

-

ブローカーはホスト アドレスです。 ZooKeeper を使用している場合は、ホスト アドレスを適宜置き換えてください。

-

groupId および topic は、必要に応じて変更できます。

次はプロデューサーの初期化です。


func startProducer() { p、err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": ブローカー}) エラーの場合 != nil { パニック(えー) } go func() { for e := range p.Events() { switch ev := e.(type) { case *kafka.メッセージ: if ev.TopicPartition.Error != nil { fmt.Printf("配信失敗: %v\n", ev.TopicPartition) } それ以外 { fmt.Printf("メッセージを %v に配信しました\n", ev.TopicPartition) } } } }() for _, word := range []string{"メッセージ 1", "メッセージ 2", "メッセージ 3"} { p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{トピック: &topic、パーティション: kafka.PartitionAny}、 値: []バイト(ワード)、 }、なし) } }
package main

import (
  "fmt"
  "github.com/confluentinc/confluent-kafka-go/kafka"
)

var (
  broker  = "localhost:9092"
  groupId = "group-id"
  topic   = "topic-name"
)
上記のコードは、メッセージ

{"message 1", "message 2", "message 3"} の配列をトピックに送信するために使用され、go-routine: for e := range p.Events() を使用してイベントを反復処理し、配信結果を出力します。成功か失敗か。

次は、

トピックサブスクライブし、メッセージを受信するコンシューマーを作成します。

func startConsumer() { c、エラー := kafka.NewConsumer(&kafka.ConfigMap{ 「bootstrap.servers」: ブローカー、 "group.id": グループID、 "auto.offset.reset": "最も早い", }) エラーの場合 != nil { パニック(えー) } c.Subscribe(トピック、nil) のために { メッセージ、エラー := c.ReadMessage(-1) if err == nil { fmt.Printf("%s のメッセージ: %s\n", msg.TopicPartition, string(msg.Value)) } それ以外 { fmt.Printf("コンシューマ エラー: %v (%v)\n", err, msg) 壊す } } c.Close() }
package main

import (
  "fmt"
  "github.com/confluentinc/confluent-kafka-go/kafka"
)

var (
  broker  = "localhost:9092"
  groupId = "group-id"
  topic   = "topic-name"
)
最後に、これは簡単な例なので、使用する

ProducerConsumer を作成する関数を呼び出します。実際のシナリオでは、プロデューサーコンシューマーのデプロイは通常、マイクロサービスシステム内の 2 つの異なるサーバーで行われます。

関数 main() { startProducer() startConsumer() }
package main

import (
  "fmt"
  "github.com/confluentinc/confluent-kafka-go/kafka"
)

var (
  broker  = "localhost:9092"
  groupId = "group-id"
  topic   = "topic-name"
)

Connect Kafka with Golang

コーディングを楽しんでください!


このコンテンツが役立つと思われた場合は、著者をサポートし、より興味深いコンテンツを探索するために、私のブログの元の記事にアクセスしてください。

Connect Kafka with GolangConnect Kafka with GolangConnect Kafka with GolangConnect Kafka with GolangConnect Kafka with Golang


あなたが興味を持ちそうなシリーズ:

    NodeJS
  • 反応する
  • ドッカー
  • Kubernetes
リリースステートメント この記事は次の場所に転載されています: https://dev.to/chauhoangminhnguyen/connect-kafka-with-golang-3h4d?1 侵害がある場合は、[email protected] に連絡して削除してください。
最新のチュートリアル もっと>

免責事項: 提供されるすべてのリソースの一部はインターネットからのものです。お客様の著作権またはその他の権利および利益の侵害がある場合は、詳細な理由を説明し、著作権または権利および利益の証拠を提出して、電子メール [email protected] に送信してください。 できるだけ早く対応させていただきます。

Copyright© 2022 湘ICP备2022001581号-3