当前位置: 首页 > 知识库问答 >
问题:

如何自动删除未使用的Kafka消费者

楚和悌
2023-03-14

我将Kafka用于一个消息传递应用程序。对于这个应用程序,有一个生产者将消息放入一个主题,消费者注册到这个主题,并消费这些消息。这些消费者是Dockerized应用程序。出于自动缩放的目的,每个使用者在创建时都注册为具有唯一ID的使用者。

Consumer2被创建为docker容器,并将自身注册为ID为Consumer2的使用者

现在,无论出于什么原因,consumer1失败,并被consumer3替换,后者将自己注册为kafka的使用者,其ID为consumer3

问题是,consumer1不再使用。从长远来看,将会有多个未使用的消费者。

共有1个答案

萧韬
2023-03-14

如果consumer1和consumer3属于同一个consumer组,则consumer3将从consumer1停止的地方开始读取消息。这是因为Kafka维护特定于一个消费群体的抵销。因此,如果具有相同消费者组的消费者中有一个失败,其他消费者将使用偏移量,从而避免重新处理数据。

Kafka broker不会像您在问题中假设的那样在任何地方维护失败的消费者日志。

 类似资料:
  • 我在Kafka做数据复制。但是,kafka日志文件的大小增长很快。一天内大小达到5 gb。作为这个问题解决方案,我想立即删除处理过的数据。我正在使用AdminClient中的delete record方法删除偏移量。但当我查看日志文件时,对应于该偏移量的数据不会被删除。 我不想要类似(log.retention.hours,log.retention.bytes,log.segment.bytes

  • 我有一个Kafka消费者,其中消息通过HTTP POST调用传递给另一个应用程序。我还使用手动提交偏移量 确认。确认(); 有一些HTTP返回错误代码,我们忽略错误并提交偏移量,还有一些错误代码我们不提交偏移量。问题是,kafka使用者仅在我重新启动使用者时才轮询未提交的消息。如果分区中有未提交的消息,是否还有轮询消息的地方?

  • 在某些情况下,我使用Kafka流对主题的小内存(hashmap)投影进行建模。K,V缓存确实需要一些操作,因此它不是GlobalKTable的好例子。在这种“缓存”场景中,我希望我的所有兄弟实例都具有相同的缓存,因此我需要绕过消费者组机制。 要实现这一点,我通常只需使用随机生成的应用程序ID启动我的应用程序,因此每个应用程序每次重新启动都会重新加载主题。唯一的警告是,我最终会有一些消费者群体在Ka

  • 我试图构建一个简单的spring boot Kafka Consumer来消费来自Kafka主题的消息,但是由于KafkaListener方法没有被触发,所以没有任何消息被消费。 Java类: start.java: kafkaConsumerConfig.java:

  • 我正在读这篇: 自动提交提交偏移量最简单的方法是允许消费者为您执行。如果您配置启用。汽车commit=true,则每五秒钟消费者将提交客户端从poll()收到的最大偏移量。五秒钟的间隔是默认值,由设置“自动”控制。犯罪间隔ms.与消费者中的其他所有内容一样,自动提交由轮询循环驱动。无论何时进行轮询,使用者都会检查是否到了提交的时间,如果是,它将提交上次轮询中返回的偏移量。 也许问题是我的英语不好,

  • 本文向大家介绍Kafka 的消费者如何消费数据相关面试题,主要包含被问及Kafka 的消费者如何消费数据时的应答技巧和注意事项,需要的朋友参考一下 消费者每次消费数据的时候,消费者都会记录消费的物理偏移量(offset)的位置 等到下次消费时,他会接着上次位置继续消费