我将Kafka用于一个消息传递应用程序。对于这个应用程序,有一个生产者将消息放入一个主题,消费者注册到这个主题,并消费这些消息。这些消费者是Dockerized应用程序。出于自动缩放的目的,每个使用者在创建时都注册为具有唯一ID的使用者。
Consumer2被创建为docker容器,并将自身注册为ID为Consumer2
的使用者
现在,无论出于什么原因,consumer1
失败,并被consumer3
替换,后者将自己注册为kafka的使用者,其ID为consumer3
。
问题是,consumer1
不再使用。从长远来看,将会有多个未使用的消费者。
如果consumer1和consumer3属于同一个consumer组,则consumer3将从consumer1停止的地方开始读取消息。这是因为Kafka维护特定于一个消费群体的抵销。因此,如果具有相同消费者组的消费者中有一个失败,其他消费者将使用偏移量,从而避免重新处理数据。
Kafka broker不会像您在问题中假设的那样在任何地方维护失败的消费者日志。
我在Kafka做数据复制。但是,kafka日志文件的大小增长很快。一天内大小达到5 gb。作为这个问题解决方案,我想立即删除处理过的数据。我正在使用AdminClient中的delete record方法删除偏移量。但当我查看日志文件时,对应于该偏移量的数据不会被删除。 我不想要类似(log.retention.hours,log.retention.bytes,log.segment.bytes
我有一个Kafka消费者,其中消息通过HTTP POST调用传递给另一个应用程序。我还使用手动提交偏移量 确认。确认(); 有一些HTTP返回错误代码,我们忽略错误并提交偏移量,还有一些错误代码我们不提交偏移量。问题是,kafka使用者仅在我重新启动使用者时才轮询未提交的消息。如果分区中有未提交的消息,是否还有轮询消息的地方?
在某些情况下,我使用Kafka流对主题的小内存(hashmap)投影进行建模。K,V缓存确实需要一些操作,因此它不是GlobalKTable的好例子。在这种“缓存”场景中,我希望我的所有兄弟实例都具有相同的缓存,因此我需要绕过消费者组机制。 要实现这一点,我通常只需使用随机生成的应用程序ID启动我的应用程序,因此每个应用程序每次重新启动都会重新加载主题。唯一的警告是,我最终会有一些消费者群体在Ka
我试图构建一个简单的spring boot Kafka Consumer来消费来自Kafka主题的消息,但是由于KafkaListener方法没有被触发,所以没有任何消息被消费。 Java类: start.java: kafkaConsumerConfig.java:
我正在读这篇: 自动提交提交偏移量最简单的方法是允许消费者为您执行。如果您配置启用。汽车commit=true,则每五秒钟消费者将提交客户端从poll()收到的最大偏移量。五秒钟的间隔是默认值,由设置“自动”控制。犯罪间隔ms.与消费者中的其他所有内容一样,自动提交由轮询循环驱动。无论何时进行轮询,使用者都会检查是否到了提交的时间,如果是,它将提交上次轮询中返回的偏移量。 也许问题是我的英语不好,
本文向大家介绍Kafka 的消费者如何消费数据相关面试题,主要包含被问及Kafka 的消费者如何消费数据时的应答技巧和注意事项,需要的朋友参考一下 消费者每次消费数据的时候,消费者都会记录消费的物理偏移量(offset)的位置 等到下次消费时,他会接着上次位置继续消费