当前位置: 首页 > 知识库问答 >
问题:

Spring Batch:如何读取CSV文件的页脚并使用FlatFileItemReader进行验证

松铭
2023-03-14


这是我的代码。

@Bean
@StepScope
public FlatFileItemReader<Movie> movieItemReader(String filePath) {
        FlatFileItemReader<Movie> reader = new FlatFileItemReader<>();
        reader.setLinesToSkip(1);   //skip header line
        reader.setResource(new PathResource(filePath));

        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer("|");
        DefaultLineMapper<Movie> movieLineMapper = new DefaultLineMapper<>();
        FieldSetMapper<Movie> movieMapper = movieFieldSetMapper();

        movieLineMapper.setLineTokenizer(tokenizer);
        movieLineMapper.setFieldSetMapper(movieFieldSetMapper);
        movieLineMapper.afterPropertiesSet();
        reader.setLineMapper(movieLineMapper);
        return reader;
}

public FieldSetMapper<Movie> movieFieldSetMapper() {
        BeanWrapperFieldSetMapper<Movie> movieMapper = new BeanWrapperFieldSetMapper<>();
        movieMapper.setTargetType(Movie.class);
        return movieMapper;
}

共有1个答案

朱啸
2023-03-14

您可以在作业的业务逻辑之前使用面向块的步骤作为验证步骤。此步骤将使用ItemReadListener保存最后一项,并使用StepExecutionListener保存验证。下面是一个快速示例:

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.ItemReadListener;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.listener.StepExecutionListenerSupport;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.PassThroughLineMapper;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ByteArrayResource;

@Configuration
@EnableBatchProcessing
public class MyJob {

    @Autowired
    private JobBuilderFactory jobs;

    @Autowired
    private StepBuilderFactory steps;

    @Bean
    @StepScope
    public FlatFileItemReader<String> itemReader() {
        FlatFileItemReader<String> reader = new FlatFileItemReader<>();
        reader.setLinesToSkip(1);   //skip header line
        reader.setResource(new ByteArrayResource("header\nitem1\nitem2\n2".getBytes()));
        reader.setLineMapper(new PassThroughLineMapper());
        return reader;
    }

    @Bean
    public ItemWriter<String> itemWriter() {
        return items -> {
            for (String item : items) {
                System.out.println("item = " + item);
            }
        };
    }

    @Bean
    public Step step1() {
        MyListener myListener = new MyListener();
        return steps.get("step1")
                .<String, String>chunk(5)
                .reader(itemReader())
                .writer(itemWriter())
                .listener((ItemReadListener<String>) myListener)
                .listener((StepExecutionListener) myListener)
                .build();
    }

    @Bean
    public Step step2() {
        return steps.get("step2")
                .tasklet((contribution, chunkContext) -> {
                    System.out.println("Total count is ok as validated by step1");
                    return RepeatStatus.FINISHED;
                })
                .build();
    }

    @Bean
    public Job job() {
        return jobs.get("job")
                .start(step1())
                .next(step2())
                .build();
    }

    static class MyListener extends StepExecutionListenerSupport implements ItemReadListener<String> {

        private String lastItem;

        @Override
        public void beforeRead() {
        }

        @Override
        public void afterRead(String item) {
            this.lastItem = item;
        }

        @Override
        public void onReadError(Exception ex) {

        }

        @Override
        public ExitStatus afterStep(StepExecution stepExecution) {
            int readCount = stepExecution.getReadCount();
            int totalCountInFooter = Integer.valueOf(this.lastItem); // TODO sanity checks (number format, etc)
            System.out.println("readCount = " + (readCount - 1)); // substract footer from the read count
            System.out.println("totalCountInFooter = " + totalCountInFooter);
            // TODO do validation on readCount vs totalCountInFooter
            return ExitStatus.COMPLETED; // return appropriate exit status according to validation result
        }
    }

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        jobLauncher.run(job, new JobParameters());
    }

}

此示例打印:

item = item1
item = item2
item = 2
readCount = 2
totalCountInFooter = 2
Total count is ok as validated by step1

希望这能有所帮助。

 类似资料:
  • 我正在创建一个Spring批处理作业,将数据从给定的CSV文件填充到数据库表中。我创建了一个定制的FlatFileItemReader。我的问题是read()方法只被调用一次,所以只有CSV文件的第一行被插入到数据库中。 myModelCsvReader 欢迎任何一致意见。感谢//扩展FlatFileItemReader后的thereader

  • 我正在编写spring批处理,它从平面文件中读取数据,很少进行处理,并将摘要写入输出文件。与reader相比,我的处理器和写入程序相对更快。我正在使用FlatFileItemReader,并尝试了从50-1000开始的各种提交间隔。我的批处理作业必须以更快的速度处理1000万条记录。请告诉我如何提高FlatFileItemReader的速度。粘贴到我的配置文件和映射器类下面,读取字段集并将值设置为

  • 问题内容: 当我卷曲到API调用链接时http://example.com/passkey=wedsmdjsjmdd 我以csv文件格式获取员工输出数据,例如: 如何使用python解析。 我试过了: 但它不起作用,我出现了一个错误 谢谢! 问题答案: 您需要替换为urllib.urlopen或urllib2.urlopen。 例如 这将输出以下内容 最初的问题被标记为“ python-2.x”,

  • 到目前为止,我的方法是: 挑战是:不能使用。如何正确使用?

  • 我需要使用Spring batch每两个月处理一次输入文件。文件名带有时间戳,因此每次运行时都会更改。 文件名:NY_Prov_mmddyyyy。csv 我正在使用下面的代码来读取文件,我试图只给出目录名,因为fileName中有一个模式,并且文件名不是固定的。但是,这不起作用。 application.properties: 错误: 原因:java.lang.IllegalStateExcept