当前位置: 首页 > 知识库问答 >
问题:

我们如何手动重置通过Spring启动java应用程序使用的Kafka主题的偏移量?

何修能
2023-03-14

我的要求是,当应用程序无法处理从kafka主题的当前偏移量读取的消息时,重置kafka话题的偏移量,该偏移量是通过Spring Bootjava应用程序使用的。手动重置偏移量或发送否定确认后,需要通过spring boot java应用程序的kafka消费者从未提交的偏移量值再次轮询消息。这能实现吗?

共有1个答案

益英逸
2023-03-14

这就是Spring Kafka版本的问题

如果您使用的是Spring kafka 2.0.1,则可以使用SeekToCurrentErrorHandler,这将导致在下次轮询中发送失败和未处理的记录。

要使用上述处理程序配置侦听器容器,您需要将其添加到容器属性中

以下是Kafka显示器容器工厂的一个例子

@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(this.consumerFactory);
    factory.getContainerProperties().setAckOnError(false);
    factory.getContainerProperties().setErrorHandler(new SeekToCurrentErrorHandler());
    factory.getContainerProperties().setAckMode(AckMode.RECORD);
    return factory;
}

这里是详细的留档:
https://docs.spring.io/spring-kafka/reference/html/_reference.html#_seek_to_current_container_error_handlers

 类似资料:
  • 我有一个Kafka主题和一个消费者,在Spring云应用程序中分配了一个消费者组(必须)。作为一项要求,在每次应用程序重启时,我都需要从一开始就开始读取所有接收到的消息。这应该是通过属性实现的,但从这个问题可以清楚地看出,它目前不起作用。 我在kafka消费者api中发现了这个变通方法,它建议在每次重启时为消费者组分配一个新的随机名称,作为从最早开始阅读的一种方式。在Spring Cloud St

  • 我使用的是0.10.1.1 API的高级使用者。 奇怪的是,当我关闭应用程序并重新启动它时,偏移量比上次提交的偏移量大一点,我找不到原因。 我在代码中只有一个提交点。 一个分区的示例: 关机前偏移量:3107169023 分区分配时的偏移量:3107180350

  • 我有一个spring启动应用程序,我需要通过进入文件夹目录并通过命令行启动我的web应用程序来启动它。我有一门课叫应用。java及其内部代码如下所示。 我设置了类路径,然后试图运行命令"",但我得到了这个错误消息""

  • 我尝试使用本地主机url访问我的endpoint——http://localhost:8080/这是我的Application.java文件 这是我的终点 我试着用这个网址http://localhost:8080/all

  • 我使用的是camel kafka组件,我不清楚在提交补偿时引擎盖下发生了什么。如下所示,我正在聚合记录,我认为对于我的用例来说,只有在记录保存到SFTP后提交偏移量才有意义。 是否可以手动控制何时可以执行提交?

  • 问题内容: 如何重新启动Java AWT应用程序?我有一个附加了事件处理程序的按钮。我应该使用什么代码来重新启动应用程序? 我想做与应用程序中相同的事情。 问题答案: 当然,可以重新启动Java应用程序。 以下方法显示了一种重新启动Java应用程序的方法: 基本上,它执行以下操作: 查找Java可执行文件(我在这里使用了Java二进制文件,但这取决于您的要求) 查找应用程序(在我的情况下是一个ja