我以前认为设置auto.offset.reset=latest
我的消费者将始终收到他们尚未收到的消息,但最近我发现情况并非如此。这只在使用者尚未提交抵消时才起作用。在任何其他情况下,使用者将继续接收偏移大于其提交的最后偏移的消息。
由于我总是使用随机的组ID创建新的使用者,我意识到我的使用者“没有内存”,他们是新的使用者,并且他们永远不会提交偏移,因此auto.offset.reset=latest
策略将始终适用。我的疑虑就从这里开始了。假设以下场景:
my-topic
。auto.offset.reset
设置是两个使用者的最新
。my-topic
。groupid
是随机的,我没有设置任何使用者id,所以这意味着这是一个新的使用者(对吗?)。应用程序B不接收任何消息。所以,总结一下,如果我没有错的话,A接收到所有的消息,但是B错过了M4和M5。我在kafka-console-consumer.sh
中尝试了这一点,它的行为是这样的。
那么,如何让应用程序B在关闭时接收发布的消息呢?现在,如果我开始时分配与最初开始时相同的groupId,它将读取消息M4和M5,但这是在设置组ID。是否也可以设置消费者id,并获得相同的行为?
或者换一种说法,重新启动同一个消费者又是什么理解呢?如果两个消费者具有相同的groupId、相同的consumerId、两者都是相同的消费者?
顺便说一下,consumerId和属性client.ID是相同的?
如果两个使用者具有相同的group.id
设置,则它们位于同一组中。
我不完全确定您使用consumerid
是什么意思。从Kafka2.2开始,消费者配置中不存在这样的字段。
如果您讨论的是client.id
,则此设置没有任何功能影响,它只用于标记请求,以便在需要时在代理日志中匹配它们。
我在使用Kafka时遇到了一些问题。非常感谢任何帮助!我在docker swell中分别有zookeeper和kafka集群3个节点。您可以在下面看到Kafka代理配置。 我的情况: < li > 20x位制片人不断向Kafka主题传达信息 < li>1x消费者读取和记录消息 < li >终止kafka节点(docker容器停止),因此现在群集有2个Kafka代理节点(第3个节点将自动启动并加入群
是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?
我正在阅读Kafka常见问题解答,他们如下所示。 •每个分区不会被每个使用者组中的多个使用者线程/进程使用。这允许每个进程以单线程方式使用,以保证分区内的使用者的顺序(如果我们将有序消息分割成一个分区并将它们传递给多个使用者,即使这些消息是按顺序存储的,它们有时也会被无序地处理)。 有没有可能,
我对Kafka是陌生的。我用spring boot创建了一个kafka消费者(spring-kafka dependency)。在我的应用程序中,我使用了consumerFactory和producerfactory beans进行配置。所以在我的应用程序中,我创建了如下的kafka消费者。 我的配置如下 所以我想并行消费,因为我可能会收到更多的消息。关于使用并行主题,我发现我需要为一个主题创建多
由于消息需求的排序,我们有一个主题和一个分区。我们有两个消费者运行在不同的服务器上,具有相同的配置集,即groupId、consumerId和consumerGroup。即 1主题- 当我们部署消费者时,相同的代码会部署在两台服务器上。当消息到来时,我们会注意到两个消费者都在消费消息,而不是只有一个处理。让消费者在两台独立的服务器上运行的原因是,如果一台服务器崩溃,至少其他服务器可以继续处理消息。
我有两个Kafka监听器组件,每个组件监听不同的主题并期待不同的有效负载。我的问题是,我可以对两者使用相同的客户端id吗?还是必须使用不同的客户端id?如果客户端id必须不同,我想了解一个可以有效使用客户端id的用例。