当前位置: 首页 > 知识库问答 >
问题:

Spring cloud stream kafka绑定器以编程方式将kafka主题(多个分区)偏移量重置为任意数字

毋修为
2023-03-14

我需要将偏移量重置为一个数字。

详细要求:我的应用程序正在使用来自 kafka 主题的消息并将其转储到 DB 中,假设 DB 在处理已使用的消息时出现故障(offset=10),直到数据库关闭时应用程序使用的消息,直到偏移量 20。

现在DB在处理第20个偏移量消息时再次出现,现在我想再次将偏移量重置为10,以便将数据保存在数据库中。

我可以通过编程(Spring启动)来实现吗?我正在使用Spring云流活页夹Kafka。

共有1个答案

慎俊雄
2023-03-14

如果数据库已关闭,为什么还要继续接收消息

只需将侦听器容器配置为继续重试当前记录,直到数据库再次可用。

@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> listenerContainerCustomizer() {
    return (container, topic, group) -> {
        container.setErrorHandler(...);
    };
}

看见https://docs.spring.io/spring-kafka/docs/current/reference/html/#seek-到当前

 类似资料:
  • 将kafka consumer offset重置为“最早”时,它会保留一些带有偏移量的分区 显示: 为什么分区1也没有0?

  • 有一种情况,当消费者1阅读来自Kafka主题的消息时。当使用相同的groupId连接第二个用户2时,需要重新平衡分区。有没有可能以某种方式重置偏移,以便在重新平衡过程之后,两个消费者都从头开始阅读主题?

  • 我有一个问题,假设有一个TOPIC T1,有两个消费者C1和C2属于两个不同的组,电流偏移量是0.我们知道Kafka维护消费者的偏移量。因此,如果 C1 使用消息并且 Offset 变为 1,那么如果 C2 使用消息,它将从 1 偏移量开始,还是从 0 偏移量开始使用消息,会发生什么情况?表示两个不同的消费群体将如何维持抵消? 谢啦

  • 问题内容: 我正在使用Java 编写使用者。我想保持消息的实时性,因此,如果有太多消息在等待使用,例如1000条或更多,我应该放弃未使用的消息,并从最后一个偏移量开始使用。 对于此问题,我尝试比较主题的最后提交的偏移量和主题的结束偏移量(仅1个分区),如果这两个偏移量之间的差大于某个值,则将主题的最后提交的偏移量设置为下一个偏移量,这样我就可以放弃那些多余的消息。 现在我的问题是如何获得主题的最终

  • 现在我的问题是如何得到一个主题的结束偏移量,有人说我可以用老消费者,但是太复杂了,新消费者有这个功能吗?

  • 问题内容: 在轮询Kafka时,我已经使用该功能订阅了多个主题。现在,我想设置的偏离,我想从每个主题阅读,而无需每次重新订阅后,并从一个话题。 在轮询数据之前,是否可以迭代调用每个主题名称 来 达到结果?偏移量如何精确存储在Kafka中? 我每个主题有一个分区,并且只有一个使用者可以读取所有主题。 问题答案: Kafka如何存储每个主题的偏移量? 卡夫卡已将抵销存储从动物园管理员转移到卡夫卡经纪人