当前位置: 首页 > 软件库 > 云计算 > 云原生 >

kafka-go

Kafka library in Go
授权协议 MIT License
开发语言 Java
所属分类 云计算、 云原生
软件类型 开源软件
地区 不详
投 递 者 昌和悦
操作系统 跨平台
开源组织
适用人群 未知
 软件概览

kafka-go

Motivations

We rely on both Go and Kafka a lot at Segment. Unfortunately, the state of the Goclient libraries for Kafka at the time of this writing was not ideal. The availableoptions were:

  • sarama, which is by far the most popularbut is quite difficult to work with. It is poorly documented, the API exposeslow level concepts of the Kafka protocol, and it doesn't support recent Go featureslike contexts. It also passes all values aspointers which causes large numbers of dynamic memory allocations, more frequentgarbage collections, and higher memory usage.

  • confluent-kafka-go is acgo based wrapper around librdkafka,which means it introduces a dependency to a C library on all Go code that usesthe package. It has much better documentation than sarama but still lacks supportfor Go contexts.

  • goka is a more recent Kafka client for Gowhich focuses on a specific usage pattern. It provides abstractions for using Kafkaas a message passing bus between services rather than an ordered log of events, butthis is not the typical use case of Kafka for us at Segment. The package alsodepends on sarama for all interactions with Kafka.

This is where kafka-go comes into play. It provides both low and high levelAPIs for interacting with Kafka, mirroring concepts and implementing interfaces ofthe Go standard library to make it easy to use and integrate with existingsoftware.

Note:

In order to better align with our newly adopted Code of Contact, the kafka-go project has renamed our default branch to main.
For the full details of our Code Of Conduct see this document.

Migrating to 0.4

Version 0.4 introduces a few breaking changes to the repository structure whichshould have minimal impact on programs and should only manifest at compile time(the runtime behavior should remain unchanged).

  • Programs do not need to import compression packages anymore in order to readcompressed messages from kafka. All compression codecs are supported by default.

  • Programs that used the compression codecs directly must be adapted.Compression codecs are now exposed in the compress sub-package.

  • The experimental kafka.Client API has been updated and slightly modified:the kafka.NewClient function and kafka.ClientConfig type were removed.Programs now configure the client values directly through exported fields.

  • The kafka.(*Client).ConsumerOffsets method is now deprecated (along with thekafka.TopicAndGroup type, and will be removed when we release version 1.0.Programs should use the kafka.(*Client).OffsetFetch API instead.

With 0.4, we know that we are starting to introduce a bit more complexity in thecode, but the plan is to eventually converge towards a simpler and more effectiveAPI, allowing us to keep up with Kafka's ever growing feature set, and bringinga more efficient implementation to programs depending on kafka-go.

We truly appreciate everyone's input and contributions, which have made thisproject way more than what it was when we started it, and we're looking forwardto receive more feedback on where we should take it.

Kafka versions

kafka-go is currently compatible with Kafka versions from 0.10.1.0 to 2.1.0. While latest versions will be working,some features available from the Kafka API may not be implemented yet.

Golang version

kafka-go is currently compatible with golang version from 1.15+. To use with older versions of golang use release v0.2.5.

Connection

The Conn type is the core of the kafka-go package. It wraps around a rawnetwork connection to expose a low-level API to a Kafka server.

Here are some examples showing typical use of a connection object:

// to produce messages
topic := "my-topic"
partition := 0

conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
if err != nil {
    log.Fatal("failed to dial leader:", err)
}

conn.SetWriteDeadline(time.Now().Add(10*time.Second))
_, err = conn.WriteMessages(
    kafka.Message{Value: []byte("one!")},
    kafka.Message{Value: []byte("two!")},
    kafka.Message{Value: []byte("three!")},
)
if err != nil {
    log.Fatal("failed to write messages:", err)
}

if err := conn.Close(); err != nil {
    log.Fatal("failed to close writer:", err)
}
// to consume messages
topic := "my-topic"
partition := 0

conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
if err != nil {
    log.Fatal("failed to dial leader:", err)
}

conn.SetReadDeadline(time.Now().Add(10*time.Second))
batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max

b := make([]byte, 10e3) // 10KB max per message
for {
    _, err := batch.Read(b)
    if err != nil {
        break
    }
    fmt.Println(string(b))
}

if err := batch.Close(); err != nil {
    log.Fatal("failed to close batch:", err)
}

if err := conn.Close(); err != nil {
    log.Fatal("failed to close connection:", err)
}

To Create Topics

By default kafka has the auto.create.topics.enable='true' (KAFKA_AUTO_CREATE_TOPICS_ENABLE='true' in the wurstmeister/kafka kafka docker image). If this value is set to 'true' then topics will be created as a side effect of kafka.DialLeader like so:

// to create topics when auto.create.topics.enable='true'
conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", "my-topic", 0)
if err != nil {
    panic(err.Error())
}

If auto.create.topics.enable='false' then you will need to create topics explicitly like so:

// to create topics when auto.create.topics.enable='false'
topic := "my-topic"
partition := 0

conn, err := kafka.Dial("tcp", "localhost:9092")
if err != nil {
    panic(err.Error())
}
defer conn.Close()

controller, err := conn.Controller()
if err != nil {
    panic(err.Error())
}
var controllerConn *kafka.Conn
controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
    panic(err.Error())
}
defer controllerConn.Close()


topicConfigs := []kafka.TopicConfig{
    kafka.TopicConfig{
        Topic:             topic,
        NumPartitions:     1,
        ReplicationFactor: 1,
    },
}

err = controllerConn.CreateTopics(topicConfigs...)
if err != nil {
    panic(err.Error())
}

To Connect To Leader Via a Non-leader Connection

// to connect to the kafka leader via an existing non-leader connection rather than using DialLeader
conn, err := kafka.Dial("tcp", "localhost:9092")
if err != nil {
    panic(err.Error())
}
defer conn.Close()
controller, err := conn.Controller()
if err != nil {
    panic(err.Error())
}
var connLeader *kafka.Conn
connLeader, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
    panic(err.Error())
}
defer connLeader.Close()

To list topics

conn, err := kafka.Dial("tcp", "localhost:9092")
if err != nil {
    panic(err.Error())
}
defer conn.Close()

partitions, err := conn.ReadPartitions()
if err != nil {
    panic(err.Error())
}

m := map[string]struct{}{}

for _, p := range partitions {
    m[p.Topic] = struct{}{}
}
for k := range m {
    fmt.Println(k)
}

Because it is low level, the Conn type turns out to be a great building blockfor higher level abstractions, like the Reader for example.

Reader

A Reader is another concept exposed by the kafka-go package, which intendsto make it simpler to implement the typical use case of consuming from a singletopic-partition pair.A Reader also automatically handles reconnections and offset management, andexposes an API that supports asynchronous cancellations and timeouts using Gocontexts.

Note that it is important to call Close() on a Reader when a process exits.The kafka server needs a graceful disconnect to stop it from continuing toattempt to send messages to the connected clients. The given example will notcall Close() if the process is terminated with SIGINT (ctrl-c at the shell) orSIGTERM (as docker stop or a kubernetes restart does). This can result in adelay when a new reader on the same topic connects (e.g. new process startedor new container running). Use a signal.Notify handler to close the reader onprocess shutdown.

// make a new reader that consumes from topic-A, partition 0, at offset 42
r := kafka.NewReader(kafka.ReaderConfig{
    Brokers:   []string{"localhost:9092"},
    Topic:     "topic-A",
    Partition: 0,
    MinBytes:  10e3, // 10KB
    MaxBytes:  10e6, // 10MB
})
r.SetOffset(42)

for {
    m, err := r.ReadMessage(context.Background())
    if err != nil {
        break
    }
    fmt.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
}

if err := r.Close(); err != nil {
    log.Fatal("failed to close reader:", err)
}

Consumer Groups

kafka-go also supports Kafka consumer groups including broker managed offsets.To enable consumer groups, simply specify the GroupID in the ReaderConfig.

ReadMessage automatically commits offsets when using consumer groups.

// make a new reader that consumes from topic-A
r := kafka.NewReader(kafka.ReaderConfig{
    Brokers:   []string{"localhost:9092"},
    GroupID:   "consumer-group-id",
    Topic:     "topic-A",
    MinBytes:  10e3, // 10KB
    MaxBytes:  10e6, // 10MB
})

for {
    m, err := r.ReadMessage(context.Background())
    if err != nil {
        break
    }
    fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
}

if err := r.Close(); err != nil {
    log.Fatal("failed to close reader:", err)
}

There are a number of limitations when using consumer groups:

  • (*Reader).SetOffset will return an error when GroupID is set
  • (*Reader).Offset will always return -1 when GroupID is set
  • (*Reader).Lag will always return -1 when GroupID is set
  • (*Reader).ReadLag will return an error when GroupID is set
  • (*Reader).Stats will return a partition of -1 when GroupID is set

Explicit Commits

kafka-go also supports explicit commits. Instead of calling ReadMessage,call FetchMessage followed by CommitMessages.

ctx := context.Background()
for {
    m, err := r.FetchMessage(ctx)
    if err != nil {
        break
    }
    fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
    if err := r.CommitMessages(ctx, m); err != nil {
        log.Fatal("failed to commit messages:", err)
    }
}

Managing Commits

By default, CommitMessages will synchronously commit offsets to Kafka. Forimproved performance, you can instead periodically commit offsets to Kafkaby setting CommitInterval on the ReaderConfig.

// make a new reader that consumes from topic-A
r := kafka.NewReader(kafka.ReaderConfig{
    Brokers:        []string{"localhost:9092"},
    GroupID:        "consumer-group-id",
    Topic:          "topic-A",
    MinBytes:       10e3, // 10KB
    MaxBytes:       10e6, // 10MB
    CommitInterval: time.Second, // flushes commits to Kafka every second
})

Writer

To produce messages to Kafka, a program may use the low-level Conn API, butthe package also provides a higher level Writer type which is more appropriateto use in most cases as it provides additional features:

  • Automatic retries and reconnections on errors.
  • Configurable distribution of messages across available partitions.
  • Synchronous or asynchronous writes of messages to Kafka.
  • Asynchronous cancellation using contexts.
  • Flushing of pending messages on close to support graceful shutdowns.
// make a writer that produces to topic-A, using the least-bytes distribution
w := &kafka.Writer{
	Addr:     kafka.TCP("localhost:9092"),
	Topic:   "topic-A",
	Balancer: &kafka.LeastBytes{},
}

err := w.WriteMessages(context.Background(),
	kafka.Message{
		Key:   []byte("Key-A"),
		Value: []byte("Hello World!"),
	},
	kafka.Message{
		Key:   []byte("Key-B"),
		Value: []byte("One!"),
	},
	kafka.Message{
		Key:   []byte("Key-C"),
		Value: []byte("Two!"),
	},
)
if err != nil {
    log.Fatal("failed to write messages:", err)
}

if err := w.Close(); err != nil {
    log.Fatal("failed to close writer:", err)
}

Writing to multiple topics

Normally, the WriterConfig.Topic is used to initialize a single-topic writer.By excluding that particular configuration, you are given the ability to definethe topic on a per-message basis by setting Message.Topic.

w := &kafka.Writer{
	Addr:     kafka.TCP("localhost:9092"),
    // NOTE: When Topic is not defined here, each Message must define it instead.
	Balancer: &kafka.LeastBytes{},
}

err := w.WriteMessages(context.Background(),
    // NOTE: Each Message has Topic defined, otherwise an error is returned.
	kafka.Message{
        Topic: "topic-A",
		Key:   []byte("Key-A"),
		Value: []byte("Hello World!"),
	},
	kafka.Message{
        Topic: "topic-B",
		Key:   []byte("Key-B"),
		Value: []byte("One!"),
	},
	kafka.Message{
        Topic: "topic-C",
		Key:   []byte("Key-C"),
		Value: []byte("Two!"),
	},
)
if err != nil {
    log.Fatal("failed to write messages:", err)
}

if err := w.Close(); err != nil {
    log.Fatal("failed to close writer:", err)
}

NOTE: These 2 patterns are mutually exclusive, if you set Writer.Topic,you must not also explicitly define Message.Topic on the messages you arewriting. The opposite applies when you do not define a topic for the writer.The Writer will return an error if it detects this ambiguity.

Compatibility with other clients

Sarama

If you're switching from Sarama and need/want to use the same algorithm for messagepartitioning, you can use the kafka.Hash balancer. kafka.Hash routesmessages to the same partitions that Sarama's default partitioner would route to.

w := &kafka.Writer{
	Addr:     kafka.TCP("localhost:9092"),
	Topic:    "topic-A",
	Balancer: &kafka.Hash{},
}

librdkafka and confluent-kafka-go

Use the kafka.CRC32Balancer balancer to get the same behaviour as librdkafka'sdefault consistent_random partition strategy.

w := &kafka.Writer{
	Addr:     kafka.TCP("localhost:9092"),
	Topic:    "topic-A",
	Balancer: kafka.CRC32Balancer{},
}

Java

Use the kafka.Murmur2Balancer balancer to get the same behaviour as the canonicalJava client's default partitioner. Note: the Java class allows you to directly specifythe partition which is not permitted.

w := &kafka.Writer{
	Addr:     kafka.TCP("localhost:9092"),
	Topic:    "topic-A",
	Balancer: kafka.Murmur2Balancer{},
}

Compression

Compression can be enabled on the Writer by setting the Compression field:

w := &kafka.Writer{
	Addr:        kafka.TCP("localhost:9092"),
	Topic:       "topic-A",
	Compression: kafka.Snappy,
}

The Reader will by determine if the consumed messages are compressed byexamining the message attributes. However, the package(s) for all expectedcodecs must be imported so that they get loaded correctly.

Note: in versions prior to 0.4 programs had to import compression packages toinstall codecs and support reading compressed messages from kafka. This is nolonger the case and import of the compression packages are now no-ops.

TLS Support

For a bare bones Conn type or in the Reader/Writer configs you can specify a dialer option for TLS support. If the TLS field is nil, it will not connect with TLS.

Connection

dialer := &kafka.Dialer{
    Timeout:   10 * time.Second,
    DualStack: true,
    TLS:       &tls.Config{...tls config...},
}

conn, err := dialer.DialContext(ctx, "tcp", "localhost:9093")

Reader

dialer := &kafka.Dialer{
    Timeout:   10 * time.Second,
    DualStack: true,
    TLS:       &tls.Config{...tls config...},
}

r := kafka.NewReader(kafka.ReaderConfig{
    Brokers:        []string{"localhost:9093"},
    GroupID:        "consumer-group-id",
    Topic:          "topic-A",
    Dialer:         dialer,
})

Writer

dialer := &kafka.Dialer{
    Timeout:   10 * time.Second,
    DualStack: true,
    TLS:       &tls.Config{...tls config...},
}

w := kafka.NewWriter(kafka.WriterConfig{
	Brokers: []string{"localhost:9093"},
	Topic:   "topic-A",
	Balancer: &kafka.Hash{},
	Dialer:   dialer,
})

SASL Support

You can specify an option on the Dialer to use SASL authentication. The Dialer can be used directly to open a Conn or it can be passed to a Reader or Writer via their respective configs. If the SASLMechanism field is nil, it will not authenticate with SASL.

SASL Authentication Types

Plain

mechanism := plain.Mechanism{
    Username: "username",
    Password: "password",
}

SCRAM

mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
if err != nil {
    panic(err)
}

Connection

mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
if err != nil {
    panic(err)
}

dialer := &kafka.Dialer{
    Timeout:       10 * time.Second,
    DualStack:     true,
    SASLMechanism: mechanism,
}

conn, err := dialer.DialContext(ctx, "tcp", "localhost:9093")

Reader

mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
if err != nil {
    panic(err)
}

dialer := &kafka.Dialer{
    Timeout:       10 * time.Second,
    DualStack:     true,
    SASLMechanism: mechanism,
}

r := kafka.NewReader(kafka.ReaderConfig{
    Brokers:        []string{"localhost:9093"},
    GroupID:        "consumer-group-id",
    Topic:          "topic-A",
    Dialer:         dialer,
})

Writer

mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
if err != nil {
    panic(err)
}

dialer := &kafka.Dialer{
    Timeout:       10 * time.Second,
    DualStack:     true,
    SASLMechanism: mechanism,
}

w := kafka.NewWriter(kafka.WriterConfig{
	Brokers: []string{"localhost:9093"},
	Topic:   "topic-A",
	Balancer: &kafka.Hash{},
	Dialer:   dialer,
})

Reading all messages within a time range

startTime := time.Now().Add(-time.Hour)
endTime := time.Now()
batchSize := int(10e6) // 10MB

r := kafka.NewReader(kafka.ReaderConfig{
    Brokers:   []string{"localhost:9092"},
    Topic:     "my-topic1",
    Partition: 0,
    MinBytes:  batchSize,
    MaxBytes:  batchSize,
})

r.SetOffsetAt(context.Background(), startTime)

for {
    m, err := r.ReadMessage(context.Background())

    if err != nil {
        break
    }
    if m.Time.After(endTime) {
        break
    }
    // TODO: process message
    fmt.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
}

if err := r.Close(); err != nil {
    log.Fatal("failed to close reader:", err)
}

Testing

Subtle behavior changes in later Kafka versions have caused some historical tests to break, if you are running against Kafka 2.3.1 or later, exporting the KAFKA_SKIP_NETTEST=1 environment variables will skip those tests.

Run Kafka locally in docker

docker-compose up -d

Run tests

KAFKA_VERSION=2.3.1 \
  KAFKA_SKIP_NETTEST=1 \
  go test -race ./...
  • 一、kafka集群搭建     至于kafka是什么我都不多做介绍了,网上写的已经非常详尽了。  (没安装java环境的需要先安装 yum -y install java-1.8.0-openjdk*) 1. 下载zookeeper  https://zookeeper.apache.org/releases.html 2. 下载kafka http://kafka.apache.org/down

  • 一、kafka是什么 1、Kafka 本质上是⼀个消息队列,一个高吞吐量、持久性、分布式的消息系统。 2、包含生产者(producer)和消费者(consumer),每个consumer属于一个特定的消费者组(Consumer Group)。 3、生产者生产消息(message)写入到kafka服务器(broker,kafka集群的节点),消费者从kafka服务器(broker)读取消息。 4、消

  • 来自:指月 https://www.lixueduan.com 原文:https://www.lixueduan.com/post/kafka/03-kafka-introduction/ 本文为 Kafka 入门教程,主要包括相关概念介绍如: 消息引擎、 Kafka 相关术语、角色定位及其版本选择等等。 1. 消息引擎 Kafka 系列相关代码见 Github Kafka 是什么呢? 用一句话概

  • Motivations We rely on both Go and Kafka a lot at Segment. Unfortunately, the state of the Go client libraries for Kafka at the time of this writing was not ideal. The available options were: sarama,

  • 概要 Consumer group主要处理协调消费的问题。 为了应对变化,消费时按照时间段被切分成不同的generation,在同一时刻所有的group中的clients对应同一个generation,同一时刻只会有一个generation,新的generation需要所有旧generation产生的goroutine结束后才会创建(后面代码分析会做解释)。使用generation的另一个好处是防

  • 问题 $ go build t.go # pkg-config --cflags rdkafka Package rdkafka was not found in the pkg-config search path. Perhaps you should add the directory containing `rdkafka.pc' to the PKG_CONFIG_PATH enviro

  • 概要 git 地址 -- GitHub - segmentio/kafka-go: Kafka library in Go Dialer类型的主要作用是创建与kafka集群的连接。通过分析它的行为,可知它是建立在底层socket之上的,同时它也是作为上层api reader的一个基础组件。 物理连接上,它考虑了超时控制,安全连接tls,认证sasl,域名转换,重试机制等。 它的另一个重要作用是建立

  • 最近在给组里用到的镜像瘦身,也就是用一个更轻一点的基础镜像来重新构建服务的镜像,然后发现我们的项目 indirect 依赖到了 confluent-kafka-go,然后这玩意是需要在本地环境用到 librdkafka,这是一个用 C++ 写的 Kafka 的库,如果不熟悉 C++的朋友,搞起来就会很费劲。 说下编译遇到的问题,本地执行 go build 发现下面的报错。 gopkg.in/con

 相关资料
  • 我需要从Kafka主题获取消息并通过基于HTTP的API通知其他系统。也就是说,从主题获取消息,映射到第三方API并调用它们。我打算为此编写一个Kafka Sink连接器。 对于这个用例,Kafka Connect是正确的选择还是我应该使用Kafka客户端。

  • 我无法批量阅读Kafka骆驼消费者,尽管遵循了这里发布的一个例子。我需要对我的生产者进行更改,还是我的消费者配置最有可能出现问题? 所讨论的应用程序利用kafka camel组件接收来自restendpoint的消息,验证它们,并将它们放在主题上。然后,我有一个单独的服务,从主题中使用它们,并将它们保存在时间序列数据库中。 消息是一次一个地产生和消费的,但是数据库希望消息是批量消费和提交的,以获得

  • https://github.com/joekiller/logstash-kafka 插件已经正式合并进官方仓库,以下使用介绍基于logstash 1.4相关版本,1.5及以后版本的使用后续依照官方文档持续更新。 插件本身内容非常简单,其主要依赖同一作者写的 jruby-kafka 模块。需要注意的是:该模块仅支持 Kafka-0.8 版本。如果是使用 0.7 版本 kafka 的,将无法直接使

  • Kafka提供分布式、高吞吐、可扩展的消息队列服务。 Kafka提供分布式、高吞吐、可扩展的消息队列服务。消息队列Kafka版广泛用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等大数据领域,已成为大数据生态中不可或缺的部分。 目前仅支持只读对接腾讯云的Kafka。 入口:在云管平台单击左上角导航菜单,在弹出的左侧菜单栏中单击 “中间件/消息队列/Kafka” 菜单项,进入Kafka列表。

  • kafka 启动一个 kafka 环境,包括 zookeeper 和 kakka。 启动 $ docker-compose up -dCreating kafka_zookeeper_1Creating kafka_kafka_1$ 扩展到多个 $ docker-compose scale kafka=3Creating and starting kafka_kafka_2 ... doneCre

  • Kafka 是由 Apache 软件基金会 开发的一个开源流处理平台,由 Scala 和 Java 编写。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。其持久化层本质上是一个 "按照分布式事务日志架构的大规模发布/订阅消息队列" longlang/phpkafka 组件由 龙之言 提供,支持 PHP-FPM 和 Swoole。感谢 Swoole 团队 和 禅道团队 对社区做出的贡

  • SRS3不支持KAFKA,未来将会支持定义日志格式,写入文件后,其他大数据产品从文件读取。

  • 为什么会出现 Kafka 传统架构中,数据是 IT 系统的核心,数据总是从一个地方复制到另一个地方,例如,一个大的银行,生产相关的核心系统及核心系统数据库一般运行在性能比较好的大机上面,而数据分析通常在大数据平台、实时数据分析平台、数据仓库中进行,数据从生产库到数据分析平台通常是通过数据复制进行的。在这个例子中,我们可以将生产库的数据库看作是生产者,而将数据分析平台看作是数据消费者。 相比较传统架