当前位置: 首页 > 知识库问答 >
问题:

在spring批处理作业中使用KafkaItemReader时,一旦所有消息被处理并写入,如何提交偏移量。dat文件?

罗诚
2023-03-14

我开发了一个Spring批处理作业,它使用KafkaItemReader类读取Kafka主题。我只想在处理在定义块中读取的消息并将其成功写入输出时提交偏移量。dat文件。

@Bean
public Job kafkaEventReformatjob(
        @Qualifier("MaintStep") Step MainStep,
        @Qualifier("moveFileToFolder") Step moveFileToFolder,
        @Qualifier("compressFile") Step compressFile,
        JobExecutionListener listener)
{
    return jobBuilderFactory.get("kafkaEventReformatJob")
            .listener(listener)
            .incrementer(new RunIdIncrementer())
            .flow(MainStep)
            .next(moveFileToFolder)
            .next(compressFile)
            .end()
            .build();
}

@Bean
Step MainStep(
        ItemProcessor<IncomingRecord, List<Record>> flatFileItemProcessor,
        ItemWriter<List<Record>> flatFileWriter)
{
    return stepBuilderFactory.get("mainStep")
            .<InputRecord, List<Record>> chunk(5000)
            .reader(kafkaItemReader())
            .processor(flatFileItemProcessor)
            .writer(writer())
            .listener(basicStepListener)
            .build();
}
//Reader reads all the messages from akfka topic and sending back in form of IncomingRecord.
 @Bean
KafkaItemReader<String, IncomingRecord> kafkaItemReader() {
    Properties props = new Properties();
    props.putAll(this.properties.buildConsumerProperties());
    List<Integer> partitions = new ArrayList<>();
    partitions.add(0);
    partitions.add(1);
    return new KafkaItemReaderBuilder<String, IncomingRecord>()
            .partitions(partitions)
            .consumerProperties(props)
            .name("records")
            .saveState(true)
            .topic(topic)
            .pollTimeout(Duration.ofSeconds(40L))
            .build();
}

  @Bean
public ItemWriter<List<Record>> writer() {
    ListUnpackingItemWriter<Record> listUnpackingItemWriter = new ListUnpackingItemWriter<>();
    listUnpackingItemWriter.setDelegate(flatWriter());
    return listUnpackingItemWriter;
}

public ItemWriter<Record> flatWriter() {
    FlatFileItemWriter<Record> fileWriter = new FlatFileItemWriter<>();
    String tempFileName = "abc";
    LOGGER.info("Output File name " + tempFileName + " is in working directory ");
    String workingDir = service.getWorkingDir().toAbsolutePath().toString();
    Path outputFile = Paths.get(workingDir, tempFileName);
    fileWriter.setName("fileWriter");
    fileWriter.setResource(new FileSystemResource(outputFile.toString()));
    fileWriter.setLineAggregator(lineAggregator());
    fileWriter.setForceSync(true);
    fileWriter.setFooterCallback(customFooterCallback());
    fileWriter.close();
    LOGGER.info("Successfully created the file writer");
    return fileWriter;

}

@StepScope
@Bean
public TransformProcessor processor() {
    return new TransformProcessor();
}

==============================================================================

 @BeforeStep
public void beforeStep(StepExecution stepExecution) {
    this.stepExecution = stepExecution;
}

@AfterStep
public void afterStep(StepExecution stepExecution) {
    this.stepExecution.setWriteCount(count);
}

@Override
public void write(final List<? extends List<Record>> lists) throws Exception {

    List<Record> consolidatedList = new ArrayList<>();
    for (List<Record> list : lists) {
        if (!list.isEmpty() && null != list)
            consolidatedList.addAll(list);
    }

    delegate.write(consolidatedList);
    count += consolidatedList.size(); // to count Trailer record count
}

===============================================================

@覆盖公共列表过程(IncomingRecord记录){

    List<Record> recordList = new ArrayList<>();

    if (null != record.getEventName() and a few other conditions inside this section) {
        // setting values of Record Class by extracting from the IncomingRecord.
        recordList.add(the valid records which matching the condition);
        }else{
        return null;
        }

共有1个答案

王佐
2023-03-14

通过使用协调两个事务管理器(2PC协议)的JTA事务管理器,可以在两个事务资源(例如队列和数据库)之间同步读取操作和写入操作。

但是,如果其中一个资源不是事务性的(像大多数文件系统一样),这种方法是不可能的。因此,除非您使用事务性文件系统和协调kafka事务管理器和文件系统事务管理器的JTA事务管理器...您需要另一种方法,例如补偿事务模式。在您的情况下,“撤消”操作(补偿操作)将倒带偏移量到失败块之前的位置。

 类似资料:
  • 我用的是SpringKafka2.2.9的Spring靴2.1.9 如果消息多次失败(在afterRollbackProcessor中定义),消费者将停止轮询记录。但如果消费者重新启动,它会再次轮询相同的消息和进程。 但是我不希望消息再次被重新轮询,最好的阻止方法是什么? 这是我的配置 我怎样才能做到这一点?

  • 我第一次使用基于Kafka的Spring引导应用程序。我的要求是使用spring批处理创建一个包含所有记录的输出文件。我创建了一个spring批处理作业,其中集成了一个扩展KafkaItemReader的定制类。我现在不想提交偏移量,因为我可能需要从已经使用的偏移量中读取一些记录。我的消费者配置有这些属性; enable.auto.commit:错误自动偏移复位:最新group.id: 有两种情况

  • 我正在实现spring kafka批处理侦听器,它读取来自kafka主题的消息列表,并将数据发布到REST服务。我想了解在REST服务停止的情况下的偏移管理,不应该提交批处理的偏移,应该为下一次轮询处理消息。我已经阅读了spring kafka文档,但在理解侦听器错误处理程序和批量查找当前容器错误处理程序之间的区别时存在困惑。我使用的是spring-boot-2.0.0。M7及以下版本是我的代码。

  • 我有以下工作要处理在一定的时间间隔或特别的基础上。 作业中的步骤如下: 我也想要用户界面,在那里我可以触发一个特别的基础上的工作,而且我应该能够提供参数从用户界面。 我想用Spring batch来完成这个任务,但它更多的是用于读->处理->写之类的工作。这里,在第一步中,我正在生成由第二步读取的数据。我不确定我是否还可以使用Spring batch来实现这个,或者有更好的方法来实现这个。

  • 我需要为以下有效日期创建。 我尝试了以下日期时间格式化程序,但在最近两个日期(和)中失败。 从到的所有日期都可以正常工作,但在尝试分析最近两个日期时,我得到一个: 线程“main”java.time.format.DateTimeParseException中出现异常:无法解析文本“2017-06-20T17:25:28.477777+0530”,在索引29处找到未解析的文本 用于解析我正在使用的

  • 我有一个作业流,我希望以以下方式运行它: 作业流将从Job1开始。在Job1成功完成后,Job1将同时启动Job2和Job4。 Job2和Job4将并行运行。 在Job2成功完成后,Job2将启动Job3。 在Job4成功完成后,Job4将启动Job5。 下面是job1.xml和job1的作业启动器类的代码片段: job1.xml uijobLauncher.java “job2,Job3”对和“