当前位置: 首页 > 知识库问答 >
问题:

春云流3.x-回放消息策略

燕寒
2023-03-14

我正在寻找一些关于利用Spring Cloud Stream 3 . x/Kafka binder实现的Kafka主题的重放消息策略的指导-

> < li>

重播特定消息[例如通过时间戳窗口]。如何为消费者组中的所有或部分消费者重置补偿?

是否可以从主题的特定分区重播[如果我们知道我们有兴趣重放的消息的分区]?

一般来说,关于消息回放的最佳实践是什么。感谢您抽出时间。

共有1个答案

田翔
2023-03-14

添加一个重新平衡侦听器 bean,它将被连接到绑定器中......

@Bean
KafkaBindingRebalanceListener rebal() {
    return new KafkaBindingRebalanceListener() {

        @Override
        public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
                Collection<TopicPartition> partitions, boolean initial) {

            consumer.seekToBeginning(partitions);
        }

    };
}

您可以使用任何消费者寻求操作;您也可以调用< code>consumer.offsetsForTimes(...)等。

对于第一次重新平衡,< code>initial标志为真,对于其他重新平衡为假。

 类似资料:
  • 我正在使用Spring Cloud Stream(Edgware.SR5)和Spring Boot(1.5.10.RELEASE)。我的@StreamListener正在处理收到的每条消息两次。 该示例的思想是在队列中发布消息并对其进行处理。 服务: 绑定: application.properties: 配置(用于在测试中注入代理服务): 测试: 我得到了以下输出: 我不知道我的配置有什么问题,

  • 我正试图按照GitHub的建议设置测试 其中StreamProcessor设置为 -->line从不使用在我看来应该在主题“output”上的消息,因为@StreamProcessor有@Sendto(“output”) 我希望能够测试流处理的消息。

  • Spring Cloud Kafka Streams与Spring Cloud Stream、Spring Cloud Function、Spring AMQP和Spring for Apache Kafka有什么区别?

  • 我的使用者绑定到匿名使用者组,而不是我指定的使用者组。 我的春靴应用 我的输入输出通道接口 我的控制台日志-- :在3.233秒内启动ConsumerApplication(JVM运行于4.004):[使用者Clientid=Consumer-3,Groupid=Anonymous.0D0C87D6-EF39-4BFE-B475-4491C40CAF6D]发现组协调器Singh:9092(ID:2

  • 我试图改变生产者和消费者配置的顺序,但没有帮助。 编辑:我已经添加了完整的application.yml。当我第一次引导服务时,这个主题在Kafka中是不存在的。 它感觉在生产者和消费者配置之间有冲突,我认为它说有3个分区的原因是消费者中的并发性是3,所以它首先创建有3个分区的主题,然后当它移动到生产者配置时,它不调整分区计数。

  • 我正在尝试用spring cloud stream实现spring cloud契约。我有一个使用StreamBridge的制作人 方法sendMessage()是从rest控制器调用的。 我的合同是这样的: 当我运行测试时,会调用triggerCreateOrganization()方法,并在日志中看到日志消息“生产组织到主题”。 我在生成的测试的基类上有@AutoConfigureMessage