我遇到了一件关于Kafka再平衡的奇怪事情。如果我增加某个主题的分区,而该主题是由一些java使用者(在同一个组中)订阅的,则不会发生使用者再平衡。在那之后,我试图通过启动一个新的消费者(或杀死一个消费者)来实现重新平衡,但在这个重新平衡中无法分配新增加的分区。我发现只有在停止所有使用者并启动它们之后,才能分配新分区。我不知道这是正常还是有任何解释。
下面是我在电脑上的测试:
1.启动Kafka,ZK。创建一个包含1个分区的普通主题(测试主题)
./bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic test-topic --partitions 1 --replication-factor 1 --config retention.ms=604800000
2.启动2个java消费者(C1,C2),订阅测试主题
3.增加2个测试主题分区
$ ./bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --alter --topic test-topic --partitions 3
C1、C2中未发生再平衡
4.开始订阅测试主题的新使用者C3。重新平衡发生,但重新分配中只涉及分区test-topic-0,既不涉及test-topic-1也不涉及test-topic-2。
5.我试图通过停止C2和C3来实现再平衡。但是,测试主题1和测试主题2仍未分配。
6.停止所有正在运行的使用者,然后启动它们。正常分配所有测试主题0,1,2。
Kafka
动物园管理员:3.4。13
消费者代码:
public class KafkaConsumerThread extends Thread {
// consumer settings
public static org.apache.kafka.clients.consumer.KafkaConsumer<String, String> createNativeConsumer(String groupName, String kafkaBootstrap) {
Properties props = new Properties();
props.put("bootstrap.servers", kafkaBootstrap);
props.put("group.id", groupName);
props.put("auto.offset.reset", "earliest");
props.put("enable.auto.commit", true);
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
return new KafkaConsumer<String, String>(props);
}
@Override
public void run() {
log.info("Start consumer ..");
consumer.subscribe(Collections.singleton(topicName), consumerRebalanceListener);
while (!stop) {
try {
ConsumerRecords<String, String> records = consumer.poll(100);
receivedRecordNumber.addAndGet(records.count());
Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
while (iterator.hasNext()) {
ConsumerRecord<String, String> record = iterator.next();
log.info("Receive [key:{}][value:{}]", record.key(), record.value());
}
} catch (TimeoutException e) {
log.info("no data");
}
}
consumer.close();
}
}
感谢@Aftab虚拟的评论。我再次测试,等待更长的时间。在第一个消费者启动后大约5分钟,重新平衡确实会自动提高,并且所有分区test-toption-0,1,2都重新分配。因此,Kafka确实有一个自动重新平衡后,改变分区。
此外,我按照@Aftab Virtual的建议更换领导。不平衡检查间隔秒
至30秒。然而,涉及所有分区的重新平衡发生在分区增加大约3分钟后。我确实为代理添加了设置:
auto.leader.rebalance.enable = true
leader.imbalance.check.interval.seconds = 30
我不知道这种再平衡的机制是什么。并且没有更多的日志用于此重新平衡:
[2018-10-18 11:32:47,958] INFO [GroupCoordinator 0]: Preparing to rebalance group test-group with old generation 4 (__consumer_offsets-12) (kafka.coordinator.group.GroupCoordinator)
[2018-10-18 11:32:50,963] INFO [GroupCoordinator 0]: Stabilized group test-group generation 5 (__consumer_offsets-12) (kafka.coordinator.group.GroupCoordinator)
[2018-10-18 11:32:50,964] INFO [GroupCoordinator 0]: Assignment received from leader for group test-group for generation 5 (kafka.coordinator.group.GroupCoordinator)
在向Kafka团队和一些Kafka用户寻求建议后,我得到了对我测试结果的解释。这不是虫子。
分区的增加将标记metadata.update需要=true。然而,这将不会真正触发更新,直到下一个元数据过期时间(默认的metadata.max.age.ms
是5*60*1000 ms)。在组的领导者更新其元数据之前,由改变消费者号码引起的重新平衡不会涉及新的分区。
我减少了元数据。最大年龄。ms
到30秒,Kafka对分区的增加变得更加敏感。
首先,很抱歉,如果我的术语不准确,我对Kafka很陌生,我已经尽可能多地读过了。我们有一个使用kafkastreams的服务,kafka版本:2.3.1。流应用程序具有一个流拓扑,该流拓扑从“topica”读取,执行转换并发布到另一个主题“topicb”,然后由拓扑的另一个流消费,并使用Ktable(localstore)聚合它。侦听器将ktable更改发布到另一个主题中。 主题有24个分区。我们
Kafka重新平衡旨在将主题的所有分区重新分配给订阅组活动成员,以便在任何给定时刻只有一个使用者使用任何主题分区。因此,如果消费者刚刚订阅了一个主题,那么一切都很清楚,但消费者api还提供了一种方法,可以将特定分区分配给消费者:
当我们的kafka主题中有多个分区时,分区重新平衡是一件常见的事情吗? 这并不一定意味着我们的应用程序存在延迟或问题? 我一直看到分区被撤销和重新分配的日志。
我对再平衡有些怀疑。现在,我正在手动将分区分配给使用者。因此,根据文件,如果消费者离开/崩溃在一个消费群体中,就不会有再平衡。 假设同一组中有3个分区和3个使用者,每个分区都是手动分配给每个使用者的。一段时间后,第三个消费者倒下了。既然没有再平衡,我可以采取什么措施来确保停机时间最小化?我是否需要更改前两个分区中任何一个的配置,以从第三个分区或其他分区开始使用?
我有一个关于2个代理上的3个分区的主题。(Kafka版本:0.8.1) 使用不同的用户guid(如:FC42B34DD7658503E040970A2C437358)作为分区密钥批量添加消息。(约10K条消息) 在加载消息时,我有一个正在运行的消费者(consumer1),它开始很好地处理消息。 然后我用相同的消费者组ID启动了另一个消费者(consumer2)。 我希望两个消费者都应该分配负载。
根据Kafka的文件: kafka保证主题分区只分配给组中的一个消费者。 但我在服务中观察到了不同的行为。以下是一些细节: 我用的是Kafka2.8和SpringKafka2.2.13。 最初我有一个Kafka主题包含5个分区,这个主题在我的服务中使用了Spring和ConcurrentKafkAlisterContainerFactory中的注释,并发性=5。这个配置对我来说很好。 后来,我开始