当使用者组a的一个Kafka使用者连接到Kafka代理时,我希望搜索到所有分区的末尾,即使在代理端存储了一个偏移量。如果更多的其他消费者为同一个消费者组连接,他们应该提取最新存储的偏移量。我正在做以下工作:
consumer.poll(timeout)
consumer.seekToEnd(emptyList())
while(true) {
val records = consumer.poll(timeout)
if(records.isNotEmpty()) {
//print records
consumer.commitSync()
}
}
问题是,当我连接消费者组A的第一个消费者c1时,一切都按预期工作,如果我连接消费者组A的另一个消费者c2,该组将重新平衡,c1将消耗跳过的抵消。
有什么想法吗?
您可以创建一个实现ConsumerReBalanceListener
的类,如下所示:
public class AlwaysSeekToEndListener<K, V> implements ConsumerRebalanceListener {
private Consumer<K, V> consumer;
public AlwaysSeekToEndListener(Consumer consumer) {
this.consumer = consumer;
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
consumer.seekToEnd(partitions);
}
}
然后在订阅主题时使用此侦听器:
consumer.subscribe(Collections.singletonList("test"), new AlwaysSeekToEndListener<String, String>(consumer));
我对再平衡有些怀疑。现在,我正在手动将分区分配给使用者。因此,根据文件,如果消费者离开/崩溃在一个消费群体中,就不会有再平衡。 假设同一组中有3个分区和3个使用者,每个分区都是手动分配给每个使用者的。一段时间后,第三个消费者倒下了。既然没有再平衡,我可以采取什么措施来确保停机时间最小化?我是否需要更改前两个分区中任何一个的配置,以从第三个分区或其他分区开始使用?
有人能告诉我Kafka消费者的再平衡算法是什么吗?我想了解分区计数和消费者线程是如何影响这一点的。 非常感谢。
我有一个多分区主题,由多个使用者(同一组)使用。我的目标是最大化消费处理,即任何消费者都可以消费来自任何分区的消息。 我知道这看起来是不可能的,因为只有一个消费者可以从一个分区中消费。 有没有可能使用REST代理来实现这一点?例如,轮询所有代理消费者实例。 谢了。
我正在创建一个系统,其中前端服务将消息推送到Kafka请求主题,并为一些下游后端消费者(实际上是一个最终推送回Kafka的复杂系统)监听另一个响应主题,以处理请求消息并最终推进到“回应”话题。 我试图找出最优雅的方法来确保消费者监听适当的分区并收到响应,并且后端推送到前端消费者正在监听的分区。我们总是需要确保响应到达产生初始消息的同一个消费者。 到目前为止,我有两种解决方案,但都不是特别令人满意的
当一个组中只有一个消费者,并且认为消费者无法在session.time.out内进行轮询时,将触发重新平衡,但是在这种情况下,组中只有一个消费者,现在假设session.time.out是30秒和消费者民意调查后50秒组协调员将识别消费者后50秒,并允许它提交偏移或协调员将断开消费者和没有偏移得到提交,并将重新平衡消费者与新的消费者标识?如果上次提交的偏移量是345678,在下一次轮询中,它处理了
假设我有一个名为“MyTopic”的主题,它有3个分区P0、P1和P2。这些分区中的每一个都有一个leader,并且本主题的数据(消息)分布在这些分区中。 1.Producer将始终根据代理上的负载以循环方式写到分区的领导者。对吗? 2.制作人如何认识隔断的首领?