我目前正在做一个kafka java项目。我是新来的,我发现很难理解与Kafka生产者/消费者设计相关的几个基本概念。
>
比方说,我有一个带有单个分区的主题,我有一个生产者正在写这个主题,一个消费者正在从这个主题中消费。如果我部署同一个应用程序的多个实例,每个实例将运行自己的消费者。在这种情况下,因为所有消费者都属于同一个group pId,所以消息是否会在多个实例上运行的消费者之间平均分配?
如何从应用程序中定期检查消费者是否还活着?。
请澄清上述问题。如果我的任何/所有假设/理解都是错误的,请纠正我。我知道我没有分享任何代码示例,因为这些都是概念性的问题。如果需要,我可以共享代码片段。
>
Kafka只允许每个分区和用户组有一个用户。因此,多个消费者将等待主消费者倒下,然后在该点跳入,但对于给定的分区/消费者组,一次只有一个消费者在消费。
您可以检查消费者组偏移是否与最大偏移保持一致,以查看是否存在滞后。如果一个人开始有问题,Kafka在循环多个消费者方面做得很好
重启应用程序,它会从中断的地方开始,只是不要从一开始就重启消息。Kafka为你处理。
我们用Spring靴搭配SpringKafka。没有“最佳”方法,但我们发现使用spring进行设置和维护很容易。
>
您说的主题使用单个分区意味着它无法将消息分发到多个分区。你会失去Kafka的一大优势。你必须增加一个以上的分区。如果您部署了同一个应用程序的多个实例,则分发没有帮助,因为消息发布到您提到的一个分区中,并且只有一个实例将仅分配给该分区,其他实例将处于空闲状态。
您可以使用AdminClient Kafka API检查您的消费者是否存在任何滞后
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9091");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
AdminClient client = org.apache.kafka.clients.admin.AdminClient.create(props);
ListConsumerGroupOffsetsResult offsets = client.listConsumerGroupOffsets("consumerId");
Map<TopicPartition, OffsetAndMetadata> tt = offsets.partitionsToOffsetAndMetadata().get();
ListConsumerGroupOffsetsResult offsets = client.listConsumerGroupOffsets(consumerId);
Map<TopicPartition, OffsetAndMetadata> tt = offsets.partitionsToOffsetAndMetadata().get();
for (Entry<TopicPartition, OffsetAndMetadata> entry : tt.entrySet()) {TopicPartition tp = entry.getKey();
OffsetAndMetadata op = entry.getValue();
Collections.singletonList(tp);
consumer.assign(Collections.singletonList(tp));
consumer.seekToEnd(Collections.singletonList(tp));
System.out.println(consumerId + "," + tp.partition() + "," + consumer.position(tp) + ","
+ op.offset() + "," + (consumer.position(tp) - op.offset()));
}
您还没有说明在何处部署,但如果您在mesos中使用marathon进行部署,它将自动重新启动。您可以手动重新启动,如果您使用与上一个相同的组id,您的应用程序将开始使用它离开的位置。
谁能请解释和指导我链接或资源阅读关于Kafka消费者如何在下面的场景下工作。 > 一个有5个消费者的消费者组和3个分区的主题(Kafka是如何决定的) 一个消费者组有5个消费者,主题有10个分区(kafka如何分担负载) 两个消费者组和两个服务器的kafka集群,其中一个主题被划分在节点1和节点2之间,当来自不同组的消费者订阅到一个分区时,如何避免重复。 上面可能不是配置kafka时的最佳实践,但
我将 Kafka 提交策略设置为最新且缺少前几条消息。如果我在开始将消息发送到输入主题之前先睡20秒,那么一切都按预期工作。我不确定问题是否与消费者需要很长时间进行分区重新平衡有关。有没有办法在开始轮询之前知道消费者是否准备好了?
本文向大家介绍Kafka 的消费者如何消费数据相关面试题,主要包含被问及Kafka 的消费者如何消费数据时的应答技巧和注意事项,需要的朋友参考一下 消费者每次消费数据的时候,消费者都会记录消费的物理偏移量(offset)的位置 等到下次消费时,他会接着上次位置继续消费
我有几个连接到Kafka集群的消费者,但我无法控制。同时,我想了解这些消费者是如何配置的。 有没有一个API可以列出所有的消费者(如果有发布者的话,这是一个额外的好处),然后读取他们所有的配置?我说的是这些消费者设置: https://docs . confluent . io/current/installation/configuration/consumer-configs . html #
我正在使用带有KafkaListener注释的spring kafka v2.5.2。 在运行时,我希望能够向消费者发送停止消费的信号。 我看到了autoStartup参数,但它似乎只对初始化有效,之后无法更改。 我看到了KafkaListenerEndpointRegistry的methode close()。。。 你有什么建议吗? 提前谢谢。
当一个新的消费者/borker被添加或下降时,Kafka会触发一个再平衡操作。Kafka是在重新平衡封锁行动。Kafka的消费者是不是在一个再平衡操作正在进行的时候就被封锁了?