我正在使用spring批处理,有一个包含两个步骤的工作,第一步(tasklet)验证头CSV,第二步读取一个CSV文件并写入另一个CSV文件,如下所示:
@Bean
public ClassifierCompositeItemWriter<POJO> classifierCompositeItemWriter() throws Exception {
Classifier<POJO, ItemWriter<? super POJO>> classifier = new ClassiItemWriter(ClassiItemWriter.itemWriter());
return new ClassifierCompositeItemWriterBuilder<POJO>()
.classifier(classifier)
.build();
}
@Bean
public Step readAndWriteCsvFile() throws Exception {
return stepBuilderFactory.get("readAndWriteCsvFile")
.<POJO, POJO>chunk(10000)
.reader(ClassitemReader.itemReader())
.processor(processor())
.writer(classifierCompositeItemWriter())
.build();
}
在阅读CSV之前,我使用了一个FlatFileItemReader(在ClassitemReader中)和一个FlatFileItemWriter(在ClassItemWriter中)。我通过下面的tasklet检查CSV文件的头是否正确:
@Bean
public Step fileValidatorStep() {
return stepBuilderFactory
.get("fileValidatorStep")
.tasklet(fileValidator)
.build();
}
如果是这样,我将处理从接收到的CSV文件到另一个文件CSV的转换。
在JobBuilderFactor
中,我检查来自taskletFileValidatorStep
的ExistStatus是否为“完成”,以将进程转发到ReadandWriteCSvFile()
,如果不是“完成”,并且taskletFileValidatorStep
返回ExistStatus“错误”,则作业结束并退出处理。
@Bean
public Job job() throws Exception {
return jobBuilderFactory.get("job")
.incrementer(new RunIdIncrementer())
.start(fileValidatorStep()).on("ERROR").end()
.next(fileValidatorStep()).on("COMPLETED").to(readAndWriteCsvFile())
.end().build();
}
问题是,当我启动作业时,Beanreadandwritecsvfile
首先运行任务,这意味着spring批处理的读取器和写入器的标准Bean总是在生命周期中加载,然后我才能验证头和检查存在状态,读取器仍在工作,读取文件并将数据放入另一个文件中,而不需要检查,因为在启动作业期间,在所有任务之前加载Bean。
如何在FileValidatorStep
之后启动ReadandWriteCsvFile
方法?
你不需要一个流动的工作,一个简单的工作就足够了。下面是一个简单的示例:
import java.util.Arrays;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBatchProcessing
public class MyJobConfiguration {
@Bean
public Step validationStep(StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get("validationStep")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
if(!isValid()) {
throw new Exception("Invalid file");
}
return RepeatStatus.FINISHED;
}
private boolean isValid() {
// TODO implement validation logic
return false;
}
})
.build();
}
@Bean
public Step readAndWriteCsvFile(StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get("readAndWriteCsvFile")
.<Integer, Integer>chunk(2)
.reader(new ListItemReader<>(Arrays.asList(1, 2, 3, 4)))
.writer(items -> items.forEach(System.out::println))
.build();
}
@Bean
public Job job(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
return jobBuilderFactory.get("job")
.start(validationStep(stepBuilderFactory))
.next(readAndWriteCsvFile(stepBuilderFactory))
.build();
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJobConfiguration.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
}
在本例中,如果validationstep
失败,则下一步是将不执行。
我有一个批处理步骤 读取器和处理器流程如何工作?读取器是读取块并等待处理器处理它,还是一次读取所有块。
(A)ItemReader[第一输入]->(A)ItemProcessor[第一输入]->(B)ItemReader[使用处理过的输入从另一个源收集第二输入]->(B)ItemProcessor[使用处理过的第一输入和第二输入]->{repeat B}->ItemWriter(最终结果) 有没有人知道如何在Spring批处理中这样做?多谢了。
我有“N”没有的。客户/客户。对于每个客户/客户,我需要从数据库(读取器)中获取记录,然后我必须处理(处理器)客户/客户的所有记录,然后我必须将记录写入文件(写入器)。 如何将spring批处理作业循环N次?
当读卡器、处理器和写入程序在一个步骤中运行时,我只需要打印一次日志。如何做到这一点,使日志不会在每次处理数据块时打印? 批次类 处理器侦听器示例类我还有另一个要编写的侦听器类。
根据已接受的答案代码,对该代码的以下调整对我起作用: 我已经将这个问题更新到了一个可以正确循环的版本,但是由于应用程序将扩展,能够处理并行是很重要的,我仍然不知道如何在运行时用javaconfig动态地做到这一点... 基于查询列表(HQL查询),我希望每个查询都有一个读取器-处理器-写入器。我当前的配置如下所示: 工单 处理机 作家 目前,该过程对于单个查询来说工作得很好。然而,我实际上有一个查
我刚开始使用Spring批处理,我有一个特殊问题。我希望使用从3个不同的jpa查询中获取结果,并分别处理它们,然后使用将它们写入一个统一的XML文件。 对于eg,生成的XML看起来像是,