当前位置: 首页 > 知识库问答 >
问题:

在broker重新启动很长时间后加载偏移量和元数据块KafkaConsumer

严远
2023-03-14

我们有一个问题,有时调用‘轮询’方法的新KafkaConsumer挂起长达20到30分钟后,三个kafka经纪人中的一个得到重新启动!

我们使用的是3 broker kafka设置(0.9.0.1)。我们的消费者进程使用新的Java KafkaConsumer-API,并且我们将分配给特定的TopicPartition。

由于不同的原因我不能在这里展示真正的代码,但基本上我们的代码是这样工作的:

Properties consumerProps=loadConsumerProperties();
// bootstrap.servers=<IP1>:9092,<IP2>:9092,<IP3>:9092
// group.id="consumer_group_gwbc2
// enable.auto.commit=false
// auto.offset.reset=latest
// session.timeout.ms=30000
// key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
// value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps);
consumer.assign(Arrays.asList(new TopicPartition("someTopic",0)));

while (true) {

  // THIS CALL sometimes blocks for a very long Time after a broker restart
  ConsumerRecords<String, byte[]> records = kafkaConsumer.poll(200);

  Iterator<ConsumerRecord<String, byte[]>> recordIter = records.iterator();
  while (recordIter.hasNext()) {                
     ConsumerRecord<String, byte[]> record = recordIter.next();

     // Very fast, actually just sending a UDP Paket via Netty.
     processRecord(record); 

     if (lastCommitHappendFiveOrMoreSecondsAgo()) {   
       kafkaConsumer.commitAsync();
     }
  }
}

kafka-topics.sh描述__consumer_offsets主题,如下所示

Topic:__consumer_offsets    PartitionCount:50   
ReplicationFactor:3 Configs:segment.bytes=104857600,
cleanup.policy=compact,compression.type=uncompressed

重新启动的代理的server.log显示,从__consumer_offsets主题的特定分区加载偏移量需要很长时间(在本例中大约22分钟)。这与用户的“轮询”调用被阻止的时间相关。

[2016-07-25 16:02:40,846] INFO [Group Metadata Manager on Broker 1]: Loading offsets and group metadata from [__consumer_offsets,15] (kafka.coordinator.GroupMetadataManager)
[2016-07-25 16:25:36,697] INFO [Group Metadata Manager on Broker 1]: Finished loading offsets from [__consumer_offsets,15] in 1375851 milliseconds.

我想知道是什么使得加载过程如此缓慢,该怎么做呢?

共有1个答案

曹渝
2023-03-14

找到原因了。

代理的server.xml配置文件包含属性

log.cleaner.enable=false

(默认情况下,对于版本0.9.0.1,该属性为true)这意味着kafkas内部compacted__consumer_offsets主题实际上没有被压缩,因为禁用了log-cleaner。实际上,本主题的一些分区增长到了几千兆字节的大小,这就解释了当一个新的组协调器需要重新填充它的缓存时,读取所有消费者偏移数据所需要的时间。

 类似资料:
  • 我对Kafka0.11.0.0有意见 在Kafka0.10.2.1中我对此没有任何问题。我只在0.11.0.0版本中遇到这个问题。 我的使用者将auto.offset.reset设置为最早,而auto commit设置为false,因为我是手动提交的。Kafka数据存储在具有必要权限的非TMP目录中。broker配置的其余部分为默认配置。 我需要0.11.0.0版本的事务。我不知道问题出在哪里。这

  • 但是如果我们重新启动kafka服务器,使用者会重新读取已经提交的偏移量吗?或者这个选项在这样的情况下工作--服务器重新启动后,只会消耗未读的消息?

  • 问题内容: 我正在使用以下代码: xlsx文件本身具有25,000行,每行包含500列的内容。在调试过程中,我看到创建XSSFWorkbook的第三行需要很长时间(1小时!)来完成此语句。 有没有更好的方法来访问原始xlsx文件的值? 问题答案: 首先,当您有文件时,不要从InputStream加载XSSFWorkbook!使用InputStream需要将所有内容缓冲到内存中,这会占用空间并占用时

  • 我有一个单一的Kafka消费者,它连接到一个有3个分区的主题。一旦我从Kafka那里得到一张唱片,我就想捕捉偏移量和分区。在重新启动时,我希望从上次读取的偏移量恢复使用者的位置 摘自Kafka文档: 每个记录都有自己的偏移量,因此要管理自己的偏移量,只需执行以下操作: 配置enable.auto.commit=false 下面是我的示例代码: 这是正确的做法吗?有没有更好的办法?

  • 我使用的是0.10.1.1 API的高级使用者。 奇怪的是,当我关闭应用程序并重新启动它时,偏移量比上次提交的偏移量大一点,我找不到原因。 我在代码中只有一个提交点。 一个分区的示例: 关机前偏移量:3107169023 分区分配时的偏移量:3107180350

  • 我遇到的问题是,当Kafka和Flink作业重新启动时,Flink Kafka消费者偏移量会重置为0,因此即使我启用了检查点并且我在Flink作业中启用了精确一次语义学,数据也会被重新处理。 这是我的环境详细信息 < li >在Kubernetes下奔跑 < li>Kafka源主题有10个分区,没有复制。 < li >Kafka有三个经纪人。 < li>Flink checkpointing启用了