当前位置: 首页 > 知识库问答 >
问题:

Kafka消费者再平衡耗时太长

徐鑫鹏
2023-03-14

我有一个Kafka流应用程序,它从几个主题中获取数据,并将数据加入另一个主题。

Kafka配置:

5 kafka brokers
Kafka Topics - 15 partitions and 3 replication factor. 

注意:我在运行Kafka Brokers的机器上运行Kafka Streams应用程序。

每小时消耗/产生数百万条记录。每当我让Kafka经纪人倒下时,都会进入再平衡阶段,再平衡大约需要30分钟,有时甚至更长时间。

有人知道如何解决Kafka消费者的再平衡问题吗?而且,很多时候,它在重新平衡时抛出异常。

这将阻止我们使用此设置在生产环境中使用。任何帮助都将不胜感激。

Caused by: org.apache.kafka.clients.consumer.CommitFailedException: ?
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:725)

at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:604)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1173)
at org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:307)
at org.apache.kafka.streams.processor.internals.StreamTask.access$000(StreamTask.java:49)
at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:268)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
at org.apache.kafka.streams.processor.internals.StreamTask.commitImpl(StreamTask.java:259)
at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:362)
at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:346)
at org.apache.kafka.streams.processor.internals.StreamThread$3.apply(StreamThread.java:1118)
at org.apache.kafka.streams.processor.internals.StreamThread.performOnStreamTasks(StreamThread.java:1448)
at org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:1110)

Kafka流配置:

bootstrap.servers=kafka-1:9092,kafka-2:9092,kafka-3:9092,kafka-4:9092,kafka-5:9092
max.poll.records = 100
request.timeout.ms=40000

它在内部创建的用户配置是:

    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [kafka-1:9092, kafka-2:9092, kafka-3:9092, kafka-4:9092, kafka-5:9092]
    check.crcs = true
    client.id = conversion-live-StreamThread-1-restore-consumer
    connections.max.idle.ms = 540000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = 
    heartbeat.interval.ms = 3000
    interceptor.classes = null
    internal.leave.group.on.close = false
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 2147483647
    max.poll.records = 100
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 40000
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

共有2个答案

籍辰沛
2023-03-14

根据我的经验,首先是你的最高投票。考虑到您的工作量,记录太小:每小时消耗/生成的记录数不多。

所以如果max.poll.records太小,比如说1,那么再平衡需要很长时间。我不知道原因。

第二,确保输入主题到你的流应用程序的分区数是一致的。例如,如果APP-1有两个输入主题A和B。如果A有4个分区,而B有2个,那么重新平衡需要很长时间。然而,如果A和B都有4个分区,如果一些分区空闲,那么重新平衡时间是好的。希望有所帮助

许出野
2023-03-14

我建议通过参数num.standby.replicas=1(默认为0)配置备用任务。这将有助于显著减少重新平衡时间。

此外,我建议将您的应用程序升级到Kafka 0.11。注意,Streams API 0.11向后兼容0.10.1和0.10.2代理,因此,您不需要为此升级代理。再平衡行为在0.11中得到了极大改进,并将在即将发布的1.0版本中得到进一步改进(参见。https://cwiki.apache.org/confluence/display/KAFKA/KIP-167:因此,将应用程序升级到最新版本始终是重新平衡的改进。

 类似资料:
  • 有人能告诉我Kafka消费者的再平衡算法是什么吗?我想了解分区计数和消费者线程是如何影响这一点的。 非常感谢。

  • 我们正在运行一个3 broker Kafka 0.10.0.1集群。我们有一个java应用程序,它产生了许多消费线程,从不同的主题消费。对于每一个主题,我们都指定了不同的消费者群体。 很多时候,我看到每当这个应用程序重新启动时,一个或多个CG需要超过5分钟来接收分区分配。在此之前,这个话题的消费者不会消费任何东西。如果我去Kafka broker并运行Consumer-Groups.sh并描述特定

  • 在消费者重新平衡期间如何确保消息排序。假设最初我们有四个分区:p1、p2、p3、p4和两个消费者c1和c2(在同一组中)。因此每个消费者得到两个分区,例如c1 : p1,p2和c2 : p3,p4。 现在添加了新的消费者,比如c3和c4,重新平衡发生,这样每个消费者都有一个分区,比如c1: p1、c2: p2、c3: p3、c4: p4。 在此期间,消费者c1可能正在处理来自分区p2的消息(在重新

  • 当一个组中只有一个消费者,并且认为消费者无法在session.time.out内进行轮询时,将触发重新平衡,但是在这种情况下,组中只有一个消费者,现在假设session.time.out是30秒和消费者民意调查后50秒组协调员将识别消费者后50秒,并允许它提交偏移或协调员将断开消费者和没有偏移得到提交,并将重新平衡消费者与新的消费者标识?如果上次提交的偏移量是345678,在下一次轮询中,它处理了

  • 当一个新的消费者/borker被添加或下降时,Kafka会触发一个再平衡操作。Kafka是在重新平衡封锁行动。Kafka的消费者是不是在一个再平衡操作正在进行的时候就被封锁了?

  • 我对再平衡有些怀疑。现在,我正在手动将分区分配给使用者。因此,根据文件,如果消费者离开/崩溃在一个消费群体中,就不会有再平衡。 假设同一组中有3个分区和3个使用者,每个分区都是手动分配给每个使用者的。一段时间后,第三个消费者倒下了。既然没有再平衡,我可以采取什么措施来确保停机时间最小化?我是否需要更改前两个分区中任何一个的配置,以从第三个分区或其他分区开始使用?