当前位置: 首页 > 知识库问答 >
问题:

如何管理Spring批处理作业中使用的KafkaItemReader中的偏移量,以防在读取消息的过程中发生任何异常

岳京
2023-03-14

我第一次使用基于Kafka的Spring引导应用程序。我的要求是使用spring批处理创建一个包含所有记录的输出文件。我创建了一个spring批处理作业,其中集成了一个扩展KafkaItemReader的定制类。我现在不想提交偏移量,因为我可能需要从已经使用的偏移量中读取一些记录。我的消费者配置有这些属性;

enable.auto.commit:错误自动偏移复位:最新group.id:

有两种情况-

共有1个答案

邓赤岩
2023-03-14

您需要为此使用持久作业存储库并配置KafkaItemReader来保存其状态。状态包含分配给读取器的每个分区的偏移量,并将保存在块边界(也就是在每个事务中)。

在重新启动场景中,读取器将使用执行上下文中每个分区的最后偏移量进行初始化,并从中断的地方恢复。

 类似资料:
  • 我开发了一个Spring批处理作业,它使用KafkaItemReader类读取Kafka主题。我只想在处理在定义块中读取的消息并将其成功写入输出时提交偏移量。dat文件。 ============================================================================== ===================================

  • 我当前正在命令行中传递文件名在spring批处理作业中的参数并运行我的作业,spring批处理作业将查找文件并读取、处理和写入该文件。我目前在读取器中的作业参数文件名和读取器文件名,如何才能在处理器和写入器中使用相同的作业参数文件名。

  • 我对非常陌生,我们正在使用。 我需要做的是使用来自主题的消息。为此,我必须用Java编写一个消费者,它将消费来自主题的消息,然后将该消息保存到数据库。保存消息后,将向Java消费者发送一些确认。如果确认为true,则应使用主题中的下一条消息。如果AcknowlDement为false(这意味着由于某些错误消息,从主题读取的信息无法保存到数据库中),则应再次读取该消息。 我认为我需要使用<code>

  • 我有以下工作要处理在一定的时间间隔或特别的基础上。 作业中的步骤如下: 我也想要用户界面,在那里我可以触发一个特别的基础上的工作,而且我应该能够提供参数从用户界面。 我想用Spring batch来完成这个任务,但它更多的是用于读->处理->写之类的工作。这里,在第一步中,我正在生成由第二步读取的数据。我不确定我是否还可以使用Spring batch来实现这个,或者有更好的方法来实现这个。

  • 问题内容: 我想使用通用方式来管理5xx错误代码,特别是当整个Spring应用程序中的db关闭时。我想要一个漂亮的错误json而不是堆栈跟踪。 对于控制器,我有一个针对不同异常的类,这也捕获了db在请求中间停止的情况。但这并不是全部。我也碰巧有一个自定义扩展名,在那里,当我打电话给我时,它将不受的管理。我在网上阅读了几件事,这只会让我更加困惑。 所以我有很多问题: 我是否需要实施自定义过滤器?我找

  • 我希望能够用REST控制器开始我的作业,然后当作业开始时,它应该在计划的基础上运行,直到我用REST再次停止它。