当前位置: 首页 > 知识库问答 >
问题:

go - GO的Kafka的问题,Local: Queue full?

楚承天
2023-06-20

今天线上报错Local: Queue full,导致接口无法使用,请求各位大佬指教
目前使用go连接kafka的库使用的是github.com/confluentinc/confluent-kafka-go/v2/kafka

问题:在我连续像生产者写入110万条数据时,报错 Local: Queue full,具体代码如下:

package kfk

import (
    "fmt"
    "github.com/confluentinc/confluent-kafka-go/v2/kafka"
    "strings"
    "time"
)

// 发送消息到指定主题
func SendMessage(broker, topic string, tableName string, message []byte) error {
    config := &kafka.ConfigMap{
        "bootstrap.servers":   strings.Join([]string{"localhost:9092"}, ","),
        "acks":                "1",
        "delivery.timeout.ms": 3000,
        "security.protocol":   "PLAINTEXT",
    }
    producer, err := kafka.NewProducer(config)
    if err != nil {
        
    }

    err = producer.Produce(&kafka.Message{
        TopicPartition: kafka.TopicPartition{
            Topic: &topic,
        },
        Key:       []byte(tableName),
        Value:     message,
        Timestamp: time.Now(),
    }, nil)
    if err != nil {
        fmt.Println(err)
        return err
    }
    return nil
}

经过多次的测试,不管是不是进行消费,都是在110万条左右开始报错,并且不在之后的数据不能写入,但是重启服务就好了,分析原因:github.com/confluentinc/confluent-kafka-go 这个包使用了队列的概念,累计到110万时,超出了队列的缓存区,所以不能再继续写入,而重启服务则清空了这个缓冲区,让服务可用,但是再累计到110万时,服务还会不可用。

各位大佬,这是这个库的Bug么?是作者特意做的,还是我的配置不正确,可以通过什么配置可以解决呢?

有没有更好一点的kafka库呢? 其他库会有这种问题吗?

共有1个答案

暴向笛
2023-06-20

这个库其实是包装了一下 c 的实现,最终其实这个报错应该也是 c 的库里返回的,并不是这个库本身的意思 详情见:https://github.com/confluentinc/librdkafka/blob/aa50e52a12bece3e399f69b8477fd0c8aadbfff1/src/rdkafka.c#L430

我就不追 C 的源码了,大胆来猜测一下,估计这个报错就是因为本地队列满了导致的(kafka 客户端在发送消息的时候,并不是收到之后马上就发送出去的,而是攒起来,一批一批发)。这样的话应该会有两种思路,一种是库里面本身支持了某个配置可以修改本地队列最大数量,或者 buffer 最大数量类似的参数;还有就是本身不提供这样的参数调整,通过前面的限流完成。

顺着这个思路去找一下文档,发现有
https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md

Compression
Producer message compression is enabled through the compression.codec configuration property.

Compression is performed on the batch of messages in the local queue, the larger the batch the higher likelyhood of a higher compression ratio. The local batch queue size is controlled through the batch.num.messages, batch.size, and linger.ms configuration properties as described in the High throughput chapter above.

估计就是 batch.num.messages 这个配置了,这个库没用过,所以不知道包装之后有没有这个配置,你可以找一下。

 类似资料:
  • 首先我使用了 "github.com/confluentinc/confluent-kafka-go/kafka" 这个包来连接Kafka,之后实现了个连接池的功能,代码如下: 开发完之后,我打算执行交叉编译,编译为Linux下可执行文件,打包脚本如下:(之前使用MQ的时候就没有问题,换成了kafka就报错): 报错如下: 十分不理解,这是什么原因呢?换成kafka之后,不能进行交叉编译么?请各位

  • 一个奇怪的问题,大佬们,这个问题我不清楚是怎么发生的,想求问一下是否有大佬知道如何解决

  • kafka-go Motivations We rely on both Go and Kafka a lot at Segment. Unfortunately, the state of the Goclient libraries for Kafka at the time of this writing was not ideal. The availableoptions were: s

  • 如代码所示 目的都是像间隔3s执行一次 但是实际执行的时候 1 执行一次要比 2 间隔更久 因为最近在弄一个线上项目 这个周期本来是设定10s的最后变成了 一分多钟才执行一次 好奇是什么问题导致的?

  • github。com/gogo/protobuf/proto[ok]github。com/gogo/protobuf/protoc-gen-gogo[ok]github。com/gogo/protobuf/gogoproto[确定]谷歌。戈朗。组织/grpc[确定] 协议--gogo_out=. greet 请帮帮我

  • 我正在创建一个自定义记录器,在这里我们可以登录到std out和std err,但也可以登录到kafka(代码示例如下:https://github.com/roppa/kafka-go). 我们有多个主题,所以我们需要多个记录器,但当我们使用多个时,就会发生一些奇怪的事情。当Kafka围棋的两个设置都是异步的时,我不会收到消费者消息,当一个是异步的,另一个是同步的时,我们会得到如下结果: 更改同