我需要将偏移量重置为一个数字。
详细要求:我的应用程序正在使用来自 kafka 主题的消息并将其转储到 DB 中,假设 DB 在处理已使用的消息时出现故障(offset=10),直到数据库关闭时应用程序使用的消息,直到偏移量 20。
现在DB在处理第20个偏移量消息时再次出现,现在我想再次将偏移量重置为10,以便将数据保存在数据库中。
我可以通过编程(Spring启动)来实现吗?我正在使用Spring云流活页夹Kafka。
如果数据库已关闭,为什么还要继续接收消息?
只需将侦听器容器配置为继续重试当前记录,直到数据库再次可用。
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> listenerContainerCustomizer() {
return (container, topic, group) -> {
container.setErrorHandler(...);
};
}
看见https://docs.spring.io/spring-kafka/docs/current/reference/html/#seek-到当前
将kafka consumer offset重置为“最早”时,它会保留一些带有偏移量的分区 显示: 为什么分区1也没有0?
有一种情况,当消费者1阅读来自Kafka主题的消息时。当使用相同的groupId连接第二个用户2时,需要重新平衡分区。有没有可能以某种方式重置偏移,以便在重新平衡过程之后,两个消费者都从头开始阅读主题?
我有一个问题,假设有一个TOPIC T1,有两个消费者C1和C2属于两个不同的组,电流偏移量是0.我们知道Kafka维护消费者的偏移量。因此,如果 C1 使用消息并且 Offset 变为 1,那么如果 C2 使用消息,它将从 1 偏移量开始,还是从 0 偏移量开始使用消息,会发生什么情况?表示两个不同的消费群体将如何维持抵消? 谢啦
问题内容: 我正在使用Java 编写使用者。我想保持消息的实时性,因此,如果有太多消息在等待使用,例如1000条或更多,我应该放弃未使用的消息,并从最后一个偏移量开始使用。 对于此问题,我尝试比较主题的最后提交的偏移量和主题的结束偏移量(仅1个分区),如果这两个偏移量之间的差大于某个值,则将主题的最后提交的偏移量设置为下一个偏移量,这样我就可以放弃那些多余的消息。 现在我的问题是如何获得主题的最终
现在我的问题是如何得到一个主题的结束偏移量,有人说我可以用老消费者,但是太复杂了,新消费者有这个功能吗?
问题内容: 在轮询Kafka时,我已经使用该功能订阅了多个主题。现在,我想设置的偏离,我想从每个主题阅读,而无需每次重新订阅后,并从一个话题。 在轮询数据之前,是否可以迭代调用每个主题名称 来 达到结果?偏移量如何精确存储在Kafka中? 我每个主题有一个分区,并且只有一个使用者可以读取所有主题。 问题答案: Kafka如何存储每个主题的偏移量? 卡夫卡已将抵销存储从动物园管理员转移到卡夫卡经纪人