我使用的是Kafka0.10.2,现在面临一个CommitFailedException。如:
无法完成提交,因为组已重新平衡并将分区分配给另一个成员。这意味着对poll()的后续调用之间的时间比配置的max.poll.interval.ms长,这通常意味着poll循环花费了太多时间处理消息。您可以通过增加会话超时,或者通过使用max.poll.records减小poll()中返回的批的最大大小来解决这一问题。
我已经将max.poll.interval.ms设置为integer.max_value。那么有谁能告诉我为什么即使我已经设置了值,这种情况还是会发生吗?
另一个问题是:我按照描述将session.timeout.ms设置为60000,但它仍然会发生。我试图通过一个简单的代码来复制
public static void main(String[] args) throws InterruptedException {
Logger logger = Logger.getLogger(KafkaConsumer10.class);
logger.info("XX");
Properties props = new Properties();
props.put("bootstrap.servers", "kafka-broker:9098");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("max.poll.interval.ms", "300000");
props.put("session.timeout.ms", "10000");
props.put("max.poll.records", "2");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("t1"));
while (true) {
Thread.sleep(11000);
ConsumerRecords<String, String> records = consumer.poll(100);
//Thread.sleep(11000);
Thread.sleep(11000);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
当我将session.timeout.ms设置为10000时,我尝试在轮询循环中睡眠超过10000毫秒,但似乎工作正常,没有例外。所以我对此感到困惑。如果heartbeat是由consumer.poll和consumer.commit触发的,那么在我的代码中heartbeat似乎超出了会话超时。为什么不抛出CommitFailedException?
嗨,为此,您需要在代码中处理重新平衡条件,并且应该处理正在进行的消息并在重新平衡之前提交它
如:
private class HandleRebalance implements ConsumerRebalanceListener {
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// Implement what you want to do once rebalancing is done.
}
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// commit current method
}
}
并使用以下语法订阅主题:
KafkaConsumer.subscribe(topicNameList,new HandleRebalance())
这样做的好处是:
>
在重新平衡时不会重复消息。
无提交失败异常
在使用者上设置的session.timeout.ms
应小于在Kafka Broker上设置的group.max.session.timeout.ms
。
这为我解决了问题。
Credit to github链接提交失败
有人能帮忙吗?原因和解决方法是什么? 此外,当我的1个kafka代理关闭时,我的kafka流应用程序没有连接到其他代理?我已经设置了
环境:Hadoop2.75.+Flink1.4+Kafka0.10 我已经建立了一个实时数据处理项目。我使用Flink表源API(Kafka010JsonTableSource)作为表源。从kafka获取数据,然后执行一个SQL,最后输出到一个kafka主题。这是一个清晰的流程,但是我在Flink集群上执行时遇到了异常,下面是我的主要代码: 我已经启用了检查点。第一次在flink上执行时,我只是遵
我正在使用kafka 0.9.0.1代理和0.9.0.1消费者客户端。我的使用者实例正在使用处理时间小于1秒的记录。和其他主要配置 一小时一两次。每天消耗约60亿次事件。似乎偏移量只存储在主题“__consumer_offsets”的一个分区中。它还会增加特定代理的负载。 有人知道这些问题吗?
今天,在我的Spring Boot和单实例Kafka应用程序中,我遇到了以下问题: CommitFailedException:无法完成提交,因为组已经重新平衡并将分区分配给了另一个成员。这意味着对poll()的后续调用之间的时间比配置的max.poll.interval.ms长,这通常意味着poll循环花费了太多时间处理消息。您可以通过增加会话超时,或者通过使用max.poll.records减
我遇到了一件关于Kafka再平衡的奇怪事情。如果我增加某个主题的分区,而该主题是由一些java使用者(在同一个组中)订阅的,则不会发生使用者再平衡。在那之后,我试图通过启动一个新的消费者(或杀死一个消费者)来实现重新平衡,但在这个重新平衡中无法分配新增加的分区。我发现只有在停止所有使用者并启动它们之后,才能分配新分区。我不知道这是正常还是有任何解释。 下面是我在电脑上的测试: 1.启动Kafka,
本文向大家介绍消费组与分区重平衡相关面试题,主要包含被问及消费组与分区重平衡时的应答技巧和注意事项,需要的朋友参考一下 当有新的消费者加入到消费者组时,原本的分区就需要重新分配;比如一个topic有30个分区,原本只有两个消费者,每人负责15个分区,当新加入一个消费者时,并没有分区可以给他消费,只能是将30个分区重新分配。 每个消费者组都会有一个broker负责协调(称为group coordin