当前位置: 首页 > 知识库问答 >
问题:

异步自动提交偏移量失败

贺高飞
2023-03-14

我有一个关于Kafka自动提交机制的问题。我正在使用启用自动提交的Spring-Kafka。作为一个实验,我在系统空闲(主题中没有新消息,没有正在处理的消息)的情况下,将我的消费者与Kafka的连接断开了30秒。重新连接后,我收到了如下几条消息:

Asynchronous auto-commit of offsets {cs-1915-2553221872080030-0=OffsetAndMetadata{offset=19, leaderEpoch=0, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

第一,我不明白有什么好犯的?系统空闲(所有以前的消息都已提交)。第二,断开时间为30秒,比max.poll.interval.ms的5分钟(300000毫秒)短得多。第三,在一次失控的Kafka故障中,我至少收到了30K条这种类型的消息,通过重新启动进程解决了这一问题。为什么会出现这种情况?

我在这里列出了我的消费者配置

allow.auto.create.topics = true
        auto.commit.interval.ms = 100
        auto.offset.reset = latest
        bootstrap.servers = [kafka1-eu.dev.com:9094, kafka2-eu.dev.com:9094, kafka3-eu.dev.com:9094]
        check.crcs = true
        client.dns.lookup = default
        client.id =
        client.rack =
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = true
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = feature-cs-1915-2553221872080030
        group.instance.id = null
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        isolation.level = read_uncommitted
        key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = SSL
        send.buffer.bytes = 131072
        session.timeout.ms = 15000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = [hidden]
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = /home/me/feature-2553221872080030.keystore
        ssl.keystore.password = [hidden]
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = /home/me/feature-2553221872080030.truststore
        ssl.truststore.password = [hidden]
        ssl.truststore.type = JKS
        value.deserializer = class org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2

共有1个答案

叶光华
2023-03-14

第一,我不明白有什么好犯的?

您是对的,如果没有新的数据流动,就没有什么新的东西要提交。但是,如果启用了auto.commit,并且您的使用者仍在运行(即使无法连接到broker),则轮询方法仍然负责以下步骤:

  • 从分配的分区获取消息
  • 触发器分区分配(如果需要)
  • 如果启用了自动偏移量提交,则提交偏移量

其次,断开时间为30秒,大大少于5分钟(300000毫秒)的max.poll.interval.ms

导致重新平衡的不是max.poll.interval,而是heartbeat.interval.ms设置和session.timeout.ms的组合。您的使用者在后台线程中发送基于间隔设置的心跳,在您的情况下为3秒。如果在此会话超时到期之前(在您的情况中为15秒),代理没有接收到心跳,则代理将从组中删除此客户端并启动重新平衡。

关于我提到的配置的更详细的描述在关于消费者配置的Kafka文档中给出

第三,在Kafka的一次失控故障中,我得到了至少30K条这种类型的消息,通过重新启动进程解决了这一问题。为什么会出现这种情况?

这似乎是前两个问题的组合,在这两个问题中,无法发送心跳,而消费者仍然试图通过连续调用的poll方法提交。

正如@GaryRussell在他的评论中提到的,我会小心地使用auto.commit.enabled,而是将偏移量管理的控制权交给自己。

 类似资料:
  • 我有一个Kafka消费者,我从它消费数据从一个特定的主题,我看到下面的例外。我使用的是Kafka版本。 我添加了这两个额外的消费者属性,但仍然没有帮助: 那个错误意味着什么?我该如何解决它?我需要添加一些其他消费者属性吗?

  • 我使用的是Spring Kafka 1.2.2版。我有一个Kafka Listener作为消费者,它监听一个主题并在弹性中索引文档。我的自动提交偏移量属性设置为true//default。

  • 我正试着把我的头绕在Kafka的交易上,而且只绕了一次。 我已经创建了一个事务性消费者,我想确保阅读和处理某个主题的所有消息。如果事务失败,消息因此丢失,Kafka仍会提交偏移量。 更正式地说,如果流处理应用程序使用消息A并生成消息B,使得B=F(A),那么恰好一次处理意味着当且仅当成功生成B时才认为A被消耗,反之亦然。来源 基于此,我假设消息A没有被消费,因此将再次被重新处理。但这条信息将如何重

  • 我有一个ReactorKafka项目,它消耗来自Kafka主题的消息,转换消息,然后写入到另一个主题。 我的理解是,只有在Reactor中成功完成所有顺序步骤后,才会提交偏移量。对吗?我想确保不会处理下一条记录,除非当前记录成功发送到目标Kafka主题。

  • null 当侦听器处理记录后返回时提交偏移量。 如果侦听器方法抛出异常,我会认为偏移量不会增加。但是,当我使用下面的code/config/command组合对其进行测试时,情况并非如此。偏移量仍然会得到更新,并且继续处理下一条消息。 我的配置: 验证偏移量的命令: 我使用的是kafka2.12-0.10.2.0和org.springframework.kafka:spring-kafka:1.1

  • 我目前正在从具有特定偏移量的主题中获取消息。我正在使用寻求()来实现它。但是当我将enable.auto.commit设置为true或使用手动同步(委托同步()/委托同步())时,Seek()不起作用,因为它没有轮询来自特定偏移量的消息,而是从最后提交的偏移量中选择。 因此,在使用Seek()时,是否必须将偏移量存储在外部DB中,而不提交给Kafka?Seek和Commit不能并行工作吗? 客户端