我们有一个问题,有时调用‘轮询’方法的新KafkaConsumer挂起长达20到30分钟后,三个kafka经纪人中的一个得到重新启动!
我们使用的是3 broker kafka设置(0.9.0.1)。我们的消费者进程使用新的Java KafkaConsumer-API,并且我们将分配给特定的TopicPartition。
由于不同的原因我不能在这里展示真正的代码,但基本上我们的代码是这样工作的:
Properties consumerProps=loadConsumerProperties();
// bootstrap.servers=<IP1>:9092,<IP2>:9092,<IP3>:9092
// group.id="consumer_group_gwbc2
// enable.auto.commit=false
// auto.offset.reset=latest
// session.timeout.ms=30000
// key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
// value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps);
consumer.assign(Arrays.asList(new TopicPartition("someTopic",0)));
while (true) {
// THIS CALL sometimes blocks for a very long Time after a broker restart
ConsumerRecords<String, byte[]> records = kafkaConsumer.poll(200);
Iterator<ConsumerRecord<String, byte[]>> recordIter = records.iterator();
while (recordIter.hasNext()) {
ConsumerRecord<String, byte[]> record = recordIter.next();
// Very fast, actually just sending a UDP Paket via Netty.
processRecord(record);
if (lastCommitHappendFiveOrMoreSecondsAgo()) {
kafkaConsumer.commitAsync();
}
}
}
kafka-topics.sh描述__consumer_offsets主题,如下所示
Topic:__consumer_offsets PartitionCount:50
ReplicationFactor:3 Configs:segment.bytes=104857600,
cleanup.policy=compact,compression.type=uncompressed
重新启动的代理的server.log显示,从__consumer_offsets主题的特定分区加载偏移量需要很长时间(在本例中大约22分钟)。这与用户的“轮询”调用被阻止的时间相关。
[2016-07-25 16:02:40,846] INFO [Group Metadata Manager on Broker 1]: Loading offsets and group metadata from [__consumer_offsets,15] (kafka.coordinator.GroupMetadataManager)
[2016-07-25 16:25:36,697] INFO [Group Metadata Manager on Broker 1]: Finished loading offsets from [__consumer_offsets,15] in 1375851 milliseconds.
我想知道是什么使得加载过程如此缓慢,该怎么做呢?
找到原因了。
代理的server.xml配置文件包含属性
log.cleaner.enable=false
(默认情况下,对于版本0.9.0.1,该属性为true)这意味着kafkas内部compacted__consumer_offsets主题实际上没有被压缩,因为禁用了log-cleaner。实际上,本主题的一些分区增长到了几千兆字节的大小,这就解释了当一个新的组协调器需要重新填充它的缓存时,读取所有消费者偏移数据所需要的时间。
我对Kafka0.11.0.0有意见 在Kafka0.10.2.1中我对此没有任何问题。我只在0.11.0.0版本中遇到这个问题。 我的使用者将auto.offset.reset设置为最早,而auto commit设置为false,因为我是手动提交的。Kafka数据存储在具有必要权限的非TMP目录中。broker配置的其余部分为默认配置。 我需要0.11.0.0版本的事务。我不知道问题出在哪里。这
但是如果我们重新启动kafka服务器,使用者会重新读取已经提交的偏移量吗?或者这个选项在这样的情况下工作--服务器重新启动后,只会消耗未读的消息?
问题内容: 我正在使用以下代码: xlsx文件本身具有25,000行,每行包含500列的内容。在调试过程中,我看到创建XSSFWorkbook的第三行需要很长时间(1小时!)来完成此语句。 有没有更好的方法来访问原始xlsx文件的值? 问题答案: 首先,当您有文件时,不要从InputStream加载XSSFWorkbook!使用InputStream需要将所有内容缓冲到内存中,这会占用空间并占用时
我有一个单一的Kafka消费者,它连接到一个有3个分区的主题。一旦我从Kafka那里得到一张唱片,我就想捕捉偏移量和分区。在重新启动时,我希望从上次读取的偏移量恢复使用者的位置 摘自Kafka文档: 每个记录都有自己的偏移量,因此要管理自己的偏移量,只需执行以下操作: 配置enable.auto.commit=false 下面是我的示例代码: 这是正确的做法吗?有没有更好的办法?
我使用的是0.10.1.1 API的高级使用者。 奇怪的是,当我关闭应用程序并重新启动它时,偏移量比上次提交的偏移量大一点,我找不到原因。 我在代码中只有一个提交点。 一个分区的示例: 关机前偏移量:3107169023 分区分配时的偏移量:3107180350
我遇到的问题是,当Kafka和Flink作业重新启动时,Flink Kafka消费者偏移量会重置为0,因此即使我启用了检查点并且我在Flink作业中启用了精确一次语义学,数据也会被重新处理。 这是我的环境详细信息 < li >在Kubernetes下奔跑 < li>Kafka源主题有10个分区,没有复制。 < li >Kafka有三个经纪人。 < li>Flink checkpointing启用了