"일꾼이 일을 잘하려면 먼저 도구를 갈고 닦아야 한다." - 공자, 『논어』.
첫 장 > 프로그램 작성 > Kafka를 Golang과 연결

Kafka를 Golang과 연결

2024-11-04에 게시됨
검색:247

소개

주요 기능, 구성 요소, 장점 등 Kafka의 기본 사항을 알고 싶다면 여기에서 해당 내용을 다루는 기사를 참조하세요. 이를 검토하고 Docker를 사용하여 Kafka 설치를 완료할 때까지 단계를 따라 다음 섹션을 진행하세요.

Connect Kafka with Golang

Golang을 사용하여 Kafka에 연결

KafkaNodeJS와 연결하는 방법에 대한 기사의 예와 유사하게 이 소스 코드에는 두 부분도 포함되어 있습니다. 생산자 메시지Kafka로 보내고 소비자를 사용하여 [의 메시지를 구독합니다. &&&]주제.

더 나은 이해를 위해 코드를 더 작은 부분으로 나누겠습니다. 먼저 변수 값을 정의해 보겠습니다.


패키지 메인 수입 ( "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" ) var ( 브로커 = "localhost:9092" groupId = "그룹 ID" 주제 = "주제 이름" )
package main

import (
  "fmt"
  "github.com/confluentinc/confluent-kafka-go/kafka"
)

var (
  broker  = "localhost:9092"
  groupId = "group-id"
  topic   = "topic-name"
)
- 여기서

github.com/confluentinc/confluent-kafka-go/kafka 패키지는 Kafka.에 연결하는 데 사용됩니다.

-

브로커는 호스트 주소입니다. ZooKeeper를 사용하는 경우 그에 따라 호스트 주소를 바꾸십시오.

-

그룹 ID주제는 필요에 따라 변경될 수 있습니다.

다음은 생산자를 초기화합니다.


func startProducer() { p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": 브로커}) 오류가 있는 경우 != nil { 패닉(err) } go func() { for e := 범위 p.Events() { 스위치 ev := e.(유형) { 사례 *kafka.메시지: ev.TopicPartition.Error != nil {인 경우 fmt.Printf("배달 실패: %v\n", ev.TopicPartition) } 또 다른 { fmt.Printf("%v에 메시지를 전달했습니다\n", ev.TopicPartition) } } } }() for _, word := range []string{"message 1", "message 2", "message 3"} { p.Produce(&kafka.메시지{ TopicPartition: kafka.TopicPartition{주제: &topic, 파티션: kafka.PartitionAny}, 값: []바이트(워드), }, 없음) } }
package main

import (
  "fmt"
  "github.com/confluentinc/confluent-kafka-go/kafka"
)

var (
  broker  = "localhost:9092"
  groupId = "group-id"
  topic   = "topic-name"
)
위 코드는 메시지 배열

{"message 1", "message 2", "message 3"}를 주제로 보내는 데 사용되며 go-routine for e := range p.Events()를 사용하여 이벤트를 반복하고 배송 결과를 인쇄합니다. 성공 또는 실패.

다음은

주제구독하고 메시지를 수신하는 소비자를 생성하는 것입니다.

func startConsumer() { c, 오류 := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": 브로커, "group.id": 그룹 ID, "auto.offset.reset": "가장 빠른", }) 오류가 있는 경우 != nil { 패닉(err) } c.구독(주제, 없음) 을 위한 { 메시지, 오류 := c.ReadMessage(-1) 오류 == nil인 경우 { fmt.Printf("%s의 메시지: %s\n", msg.TopicPartition, string(msg.Value)) } 또 다른 { fmt.Printf("소비자 오류: %v (%v)\n", err, msg) 부서지다 } } c.닫기() }
package main

import (
  "fmt"
  "github.com/confluentinc/confluent-kafka-go/kafka"
)

var (
  broker  = "localhost:9092"
  groupId = "group-id"
  topic   = "topic-name"
)
마지막으로 간단한 예제이므로 함수를 호출하여 사용할

생산자소비자를 생성합니다. 실제 시나리오에서 생산자소비자 배포는 일반적으로 마이크로서비스 시스템의 서로 다른 두 서버에서 수행됩니다.

func main() { 시작프로듀서() 시작소비자() }
package main

import (
  "fmt"
  "github.com/confluentinc/confluent-kafka-go/kafka"
)

var (
  broker  = "localhost:9092"
  groupId = "group-id"
  topic   = "topic-name"
)

Connect Kafka with Golang

즐거운 코딩하세요!


이 콘텐츠가 도움이 되었다면 내 블로그의 원본 기사를 방문하여 작성자를 지원하고 더 흥미로운 콘텐츠를 탐색해 보세요.

Connect Kafka with GolangConnect Kafka with GolangConnect Kafka with GolangConnect Kafka with GolangConnect Kafka with Golang


흥미로울 만한 시리즈:

    NodeJS
  •  반응
  • 도커 
  • 쿠버네티스
릴리스 선언문 이 글은 https://dev.to/chauhoangminhnguyen/connect-kafka-with-golang-3h4d?1에서 복제됩니다.1 침해 내용이 있는 경우, [email protected]으로 연락하여 삭제하시기 바랍니다.
최신 튜토리얼 더>

부인 성명: 제공된 모든 리소스는 부분적으로 인터넷에서 가져온 것입니다. 귀하의 저작권이나 기타 권리 및 이익이 침해된 경우 자세한 이유를 설명하고 저작권 또는 권리 및 이익에 대한 증거를 제공한 후 이메일([email protected])로 보내주십시오. 최대한 빨리 처리해 드리겠습니다.

Copyright© 2022 湘ICP备2022001581号-3