注:LinkedIn有个三人小组出来创业了—正是当时开发出Apache Kafka实时信息列队技术的团队成员,基于这项技术Jay Kreps带头创立了新公司Confluent。Confluent的产品围绕着Kafka做的。什么是Confluent Platform?Confluent Platform 是一个流数据平台,能够组织管理来自不同数据源的数据,拥有稳定高效的系统。
Confluent公司的go-kafka-client功能比较全面,但是依赖librdkafka-devel包,现在记录一下安装步骤,也可参考https://github.com/confluentinc/confluent-kafka-go 更加详细:
一、安装librdkafka
ubuntu:先安装 Confluent's Deb repository,然后用命令:apt-get install librdkafka-dev 进行安装即可.
Confluent's Deb repository添加方法
a、添加apt-key
wget -qO - https://packages.confluent.io/deb/4.0/archive.key | sudo apt-key add -
b、添加 repository 到 /etc/apt/sources.list
ad add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/4.0 stable main”
c、apt-get update 进行更新
apt-get update
CentOS7:安装yum 源,然后yum install librdkafka-devel
yum源安装方法:
a、下载key文件
rpm --import https://packages.confluent.io/rpm/4.0/archive.key
b、Add the repository to your /etc/yum.repos.d/ directory in a file named confluent.repo
[Confluent.dist]
name=Confluent repository (dist)
baseurl=https://packages.confluent.io/rpm/4.0/7
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/4.0/archive.key
enabled=1
[Confluent]
name=Confluent repository
baseurl=https://packages.confluent.io/rpm/4.0
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/4.0/archive.key
enabled=1
c、sudo yum clean all
d、yum install librdkafka-devel
二、安装confluent-kafka-go包
1、直接go get到gopath
go get -u github.com/confluentinc/confluent-kafka-go/kafka
2、下载后复制到gopath依赖里面
三、程序进行测试
// Example function-based Apache Kafka producer
package main
/**
* Copyright 2016 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"os"
)
func main() {
if len(os.Args) != 3 {
fmt.Fprintf(os.Stderr, "Usage: %s <broker> <topic>\n",
os.Args[0])
os.Exit(1)
}
broker := os.Args[1]
topic := os.Args[2]
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker})
if err != nil {
fmt.Printf("Failed to create producer: %s\n", err)
os.Exit(1)
}
fmt.Printf("Created Producer %v\n", p)
// Optional delivery channel, if not specified the Producer object's
// .Events channel is used.
deliveryChan := make(chan kafka.Event)
value := "Hello Go!"
err = p.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(value)}, deliveryChan)
e := <-deliveryChan
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
} else {
fmt.Printf("Delivered message to topic %s [%d] at offset %v\n",
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
}
close(deliveryChan)
}
输出传送成功则说明整个过程正确无误
注:运行环境也要librdkafka-dev包但是可以进行静态编译:go build -tags static
参考:https://github.com/confluentinc/confluent-kafka-go
https://www.cnblogs.com/lanyangsh/p/7782771.html