Confluent's Golang Client for Apache Kafka

宋瀚海
2023-12-01

注:LinkedIn有个三人小组出来创业了—正是当时开发出Apache Kafka实时信息列队技术的团队成员,基于这项技术Jay Kreps带头创立了新公司Confluent。Confluent的产品围绕着Kafka做的。什么是Confluent Platform?Confluent Platform 是一个流数据平台,能够组织管理来自不同数据源的数据,拥有稳定高效的系统。

        Confluent公司的go-kafka-client功能比较全面,但是依赖librdkafka-devel包,现在记录一下安装步骤,也可参考https://github.com/confluentinc/confluent-kafka-go 更加详细:

一、安装librdkafka

    ubuntu:先安装 Confluent's Deb repository,然后用命令:apt-get install   librdkafka-dev 进行安装即可.

      Confluent's Deb repository添加方法

       a、添加apt-key

  wget -qO - https://packages.confluent.io/deb/4.0/archive.key | sudo apt-key add -

       b、添加 repository 到 /etc/apt/sources.list 

 ad   add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/4.0 stable main”

       c、apt-get  update 进行更新

apt-get update

CentOS7:安装yum 源,然后yum install  librdkafka-devel

           yum源安装方法:

         a、下载key文件

 rpm --import https://packages.confluent.io/rpm/4.0/archive.key

           b、Add the repository to your /etc/yum.repos.d/ directory in a file named confluent.repo          

 [Confluent.dist]

name=Confluent repository (dist)

baseurl=https://packages.confluent.io/rpm/4.0/7

gpgcheck=1

gpgkey=https://packages.confluent.io/rpm/4.0/archive.key

enabled=1



[Confluent]

name=Confluent repository

baseurl=https://packages.confluent.io/rpm/4.0

gpgcheck=1

gpgkey=https://packages.confluent.io/rpm/4.0/archive.key

enabled=1

c、sudo yum clean all

d、yum install librdkafka-devel

二、安装confluent-kafka-go

1、直接go get到gopath

   

go get -u github.com/confluentinc/confluent-kafka-go/kafka

2、下载后复制到gopath依赖里面

三、程序进行测试

// Example function-based Apache Kafka producer

package main



/**

 * Copyright 2016 Confluent Inc.

 *

 * Licensed under the Apache License, Version 2.0 (the "License");

 * you may not use this file except in compliance with the License.

 * You may obtain a copy of the License at

 *

 * http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */



import (

    "fmt"

    "github.com/confluentinc/confluent-kafka-go/kafka"

    "os"

)



func main() {



    if len(os.Args) != 3 {

        fmt.Fprintf(os.Stderr, "Usage: %s <broker> <topic>\n",

            os.Args[0])

        os.Exit(1)

    }



    broker := os.Args[1]

    topic := os.Args[2]



    p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker})



    if err != nil {

        fmt.Printf("Failed to create producer: %s\n", err)

        os.Exit(1)

    }



    fmt.Printf("Created Producer %v\n", p)



    // Optional delivery channel, if not specified the Producer object's

    // .Events channel is used.

    deliveryChan := make(chan kafka.Event)



    value := "Hello Go!"

    err = p.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(value)}, deliveryChan)



    e := <-deliveryChan

    m := e.(*kafka.Message)



    if m.TopicPartition.Error != nil {

        fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)

    } else {

        fmt.Printf("Delivered message to topic %s [%d] at offset %v\n",

            *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)

    }



    close(deliveryChan)

}

输出传送成功则说明整个过程正确无误

注:运行环境也要librdkafka-dev包但是可以进行静态编译:go build -tags static

参考:https://github.com/confluentinc/confluent-kafka-go

       https://www.cnblogs.com/lanyangsh/p/7782771.html

 类似资料:

相关阅读

相关文章

相关问答