当前位置: 首页 > 知识库问答 >
问题:

spring cloud stream Kafka消费者不会使用服务关闭时发送的消息吗?

沈高峻
2023-03-14

我有一个spring cloud stream Kafka消费者服务,其中确认是手动完成的。提供了一个固定的用户组。

spring.cloud.stream.bindings.input.group

RESETOFFSET和startOffset属性的设置如下所示。云流动Kafka。绑定。输入消费者resetOffsets=真实Spring。云流动Kafka。绑定。输入消费者STARTOFSET=最新

消费者服务侦听主题模式。

场景:消费者服务关闭,在此期间将一些消息发送到它的主题。当消费者服务恢复时,这些消息不会被消费。只有在它重新上线后发送的消息被消费。

这是否符合预期?我正处于Kafka学习阶段,如果能给我一个解释,我将不胜感激。

共有2个答案

羊舌洛华
2023-03-14

通常,当消费者加入消费者组时,它将从最后提交的偏移量中获取。

startOffSet属性有两个选项“最早”和“最晚”。当使用者启动但分配的分区没有提交的偏移量时,将使用这些选项。

在您的场景中,您没有在从分配的分区读取后提交偏移量,因此当您的使用者服务返回时,它只读取最新的消息。

干善
2023-03-14

您明确告诉活页夹使用

spring.cloud.stream.kafka.bindings.input.consumer.resetOffsets=true 
spring.cloud.stream.kafka.bindings.input.consumer.startOffset=latest

只需移除这些属性;与组的绑定通常会提供所需的行为。

 类似资料:
  • 我有一个springboot消费者应用程序。当我第一次运行它时,它消耗了来自Kafka主题的信息。但当我再次运行它时,它停止了消耗。在日志中,我看到以下消息。 我知道消费者无法获得偏移量。在这种情况下,消费者将引用自动偏移重置属性。如您所见,我已将其设置为,希望消费者从头开始阅读。但它没有。 应用程序. yml 在我的Java课上 我尝试了一些东西。 我将值设置为。不出所料,它抛出了一个异常,抱怨

  • 我有一个生产者和一个消费者。消费者的多个实例正在运行。当生产者发布消息时,我的意图是通过所有实例消费该消息。所以,我使用的是直接交换。生产者将带有主题的消息发布到直接交换。消费者正在通过独占队列收听该主题。当消费者启动并且生产者发布消息时,此过程运行良好。但是当消费者关闭并且生产者发布消息时,消费者在启动时不会消费此消息。 我在谷歌上搜索了这个问题。建议使用命名队列。但是,如果使用命名队列,则消息

  • 这是我的消费者: 所以当运行我的制作人时,它最终会出错。任何人都知道这意味着什么,如果这可能是错的。

  • 我正在尝试让 kafka 消费者获取在 Java 中生成并发布到主题的消息。我的消费者如下。 consumer.java 当我运行上面的代码时,我在控制台中什么也看不到,屏幕后面的java producer程序正在‘AATest’主题下不断地发布数据。另外,在动物园管理员控制台中,当我尝试运行上面的consumer.java时,我得到了以下行 此外,当我运行指向 AATest 主题的单独控制台使用

  • 我正在尝试用redis streams实现一个java应用程序,其中每个consomer只使用一条消息。就像管道/队列一样,每个使用者只接收一条消息,对其进行处理,完成后,使用者接收流中尚未处理的下一条消息。有效的方法是,每条消息只被一个消费者(使用xreadgroup)使用。 我从redislabs开始学习本教程 守则: 我当前的问题是,一个消费者从队列中获取多条消息,在某些情况下,其他消费者正

  • 最近,我们与Kafka消费者和生产商之间出现了一些性能问题。我们在scala中使用Kafka Java API。打开和关闭消费者和生产者对象的良好做法是什么?我认为这是一个非常开放的问题,正确的答案总是,但我正在尝试对此进行推理。 消费者可以长时间运行连接并保持开放吗? 当我们完成信息生成时,生产者是否应该关闭?