当前位置: 首页 > 知识库问答 >
问题:

Spring批处理避免在tasklet之前启动读取器和写入器

龙永思
2023-03-14

我正在使用spring批处理,有一个包含两个步骤的工作,第一步(tasklet)验证头CSV,第二步读取一个CSV文件并写入另一个CSV文件,如下所示:

@Bean
public ClassifierCompositeItemWriter<POJO> classifierCompositeItemWriter() throws Exception {
    Classifier<POJO, ItemWriter<? super POJO>> classifier = new ClassiItemWriter(ClassiItemWriter.itemWriter());
    return new ClassifierCompositeItemWriterBuilder<POJO>()
            .classifier(classifier)
            .build();
}



@Bean
public Step readAndWriteCsvFile() throws Exception {
    return stepBuilderFactory.get("readAndWriteCsvFile")
            .<POJO, POJO>chunk(10000)
            .reader(ClassitemReader.itemReader())
            .processor(processor())
            .writer(classifierCompositeItemWriter())
            .build();
}

在阅读CSV之前,我使用了一个FlatFileItemReader(在ClassitemReader中)和一个FlatFileItemWriter(在ClassItemWriter中)。我通过下面的tasklet检查CSV文件的头是否正确:

@Bean
public Step fileValidatorStep() {
    return stepBuilderFactory
            .get("fileValidatorStep")
            .tasklet(fileValidator)
            .build();
}

如果是这样,我将处理从接收到的CSV文件到另一个文件CSV的转换。

JobBuilderFactor中,我检查来自taskletFileValidatorStep的ExistStatus是否为“完成”,以将进程转发到ReadandWriteCSvFile(),如果不是“完成”,并且taskletFileValidatorStep返回ExistStatus“错误”,则作业结束并退出处理。

@Bean
public Job job() throws Exception {
    return jobBuilderFactory.get("job")
            .incrementer(new RunIdIncrementer())
            .start(fileValidatorStep()).on("ERROR").end()
            .next(fileValidatorStep()).on("COMPLETED").to(readAndWriteCsvFile())
            .end().build();
}

问题是,当我启动作业时,Beanreadandwritecsvfile首先运行任务,这意味着spring批处理的读取器和写入器的标准Bean总是在生命周期中加载,然后我才能验证头和检查存在状态,读取器仍在工作,读取文件并将数据放入另一个文件中,而不需要检查,因为在启动作业期间,在所有任务之前加载Bean。

如何在FileValidatorStep之后启动ReadandWriteCsvFile方法?

共有1个答案

燕烨
2023-03-14

你不需要一个流动的工作,一个简单的工作就足够了。下面是一个简单的示例:

import java.util.Arrays;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class MyJobConfiguration {

    @Bean
    public Step validationStep(StepBuilderFactory stepBuilderFactory) {
        return stepBuilderFactory.get("validationStep")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                        if(!isValid()) {
                            throw new Exception("Invalid file");
                        }
                        return RepeatStatus.FINISHED;
                    }

                    private boolean isValid() {
                        // TODO implement validation logic
                        return false;
                    }
                })
                .build();
    }

    @Bean
    public Step readAndWriteCsvFile(StepBuilderFactory stepBuilderFactory) {
        return stepBuilderFactory.get("readAndWriteCsvFile")
                .<Integer, Integer>chunk(2)
                .reader(new ListItemReader<>(Arrays.asList(1, 2, 3, 4)))
                .writer(items -> items.forEach(System.out::println))
                .build();
    }

    @Bean
    public Job job(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
        return jobBuilderFactory.get("job")
                .start(validationStep(stepBuilderFactory))
                .next(readAndWriteCsvFile(stepBuilderFactory))
                .build();
    }

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new AnnotationConfigApplicationContext(MyJobConfiguration.class);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        jobLauncher.run(job, new JobParameters());
    }

}

在本例中,如果validationstep失败,则下一步是将不执行。

 类似资料:
  • 我有一个批处理步骤 读取器和处理器流程如何工作?读取器是读取块并等待处理器处理它,还是一次读取所有块。

  • (A)ItemReader[第一输入]->(A)ItemProcessor[第一输入]->(B)ItemReader[使用处理过的输入从另一个源收集第二输入]->(B)ItemProcessor[使用处理过的第一输入和第二输入]->{repeat B}->ItemWriter(最终结果) 有没有人知道如何在Spring批处理中这样做?多谢了。

  • 我有“N”没有的。客户/客户。对于每个客户/客户,我需要从数据库(读取器)中获取记录,然后我必须处理(处理器)客户/客户的所有记录,然后我必须将记录写入文件(写入器)。 如何将spring批处理作业循环N次?

  • 当读卡器、处理器和写入程序在一个步骤中运行时,我只需要打印一次日志。如何做到这一点,使日志不会在每次处理数据块时打印? 批次类 处理器侦听器示例类我还有另一个要编写的侦听器类。

  • 根据已接受的答案代码,对该代码的以下调整对我起作用: 我已经将这个问题更新到了一个可以正确循环的版本,但是由于应用程序将扩展,能够处理并行是很重要的,我仍然不知道如何在运行时用javaconfig动态地做到这一点... 基于查询列表(HQL查询),我希望每个查询都有一个读取器-处理器-写入器。我当前的配置如下所示: 工单 处理机 作家 目前,该过程对于单个查询来说工作得很好。然而,我实际上有一个查

  • 我刚开始使用Spring批处理,我有一个特殊问题。我希望使用从3个不同的jpa查询中获取结果,并分别处理它们,然后使用将它们写入一个统一的XML文件。 对于eg,生成的XML看起来像是,