当前位置: 首页 > 知识库问答 >
问题:

Spring batch:使用注释时作业实例按顺序运行

籍弘伟
2023-03-14
@Configuration
@EnableBatchProcessing
public abstract class AbstractFileLoader<T> {

    private static final String FILE_PATTERN = "*.dat";


    @Bean
    @StepScope
    @Value("#{stepExecutionContext['fileName']}")
    public FlatFileItemReader<T> reader(String file) {
        FlatFileItemReader<T> reader = new FlatFileItemReader<T>();
        String path = file.substring(file.indexOf(":") + 1, file.length());
        FileSystemResource resource = new FileSystemResource(path);
        reader.setResource(resource);
        DefaultLineMapper<T> lineMapper = new DefaultLineMapper<T>();
        lineMapper.setFieldSetMapper(getFieldSetMapper());
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(",");
        tokenizer.setNames(getColumnNames());
        lineMapper.setLineTokenizer(tokenizer);
        reader.setLineMapper(lineMapper);
        reader.setLinesToSkip(1);
        return reader;
    }

    @Bean
    public ItemProcessor<T, T> processor() {
        // TODO add transformations here
        return null;
    }

    //Exception when using JobScope for the writer
    @Bean

    public ItemWriter<T> writer() {
        ListItemWriter<T> writer = new ListItemWriter<T>();
        return writer;
    }


    @Bean
    public Job loaderJob(JobBuilderFactory jobs, Step s1,
            JobExecutionListener listener) {
        return jobs.get(getLoaderName()).incrementer(new RunIdIncrementer())
                .listener(listener).start(s1).build();
    }

    @Bean
    public Step readStep(StepBuilderFactory stepBuilderFactory,
            ItemReader<T> reader, ItemWriter<T> writer,
            ItemProcessor<T, T> processor, TaskExecutor taskExecutor,
            ResourcePatternResolver resolver) {

        final Step readerStep = stepBuilderFactory
                .get(getLoaderName() + " ReadStep:slave").<T, T> chunk(25254)
                .reader(reader).processor(processor).writer(writer)
                .taskExecutor(taskExecutor).throttleLimit(16).build();

        final Step partitionedStep = stepBuilderFactory
                .get(getLoaderName() + " ReadStep:master")
                .partitioner(readerStep)
                .partitioner(getLoaderName() + " ReadStep:slave",
                        partitioner(resolver)).taskExecutor(taskExecutor)
                .build();

        return partitionedStep;

    }


    @Bean
    public TaskExecutor taskExecutor() {
        return new SimpleAsyncTaskExecutor();
    }

    @Bean
    public Partitioner partitioner(
            ResourcePatternResolver resourcePatternResolver) {
        MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
        Resource[] resources;
        try {
            resources = resourcePatternResolver.getResources("file:"
                    + getFilesPath() + FILE_PATTERN);
        } catch (IOException e) {
            throw new RuntimeException(
                    "I/O problems when resolving the input file pattern.", e);
        }
        partitioner.setResources(resources);
        return partitioner;
    }

    @Bean
    public JobExecutionListener listener(ItemWriter<T> writer) {
        /* org.springframework.batch.core.scope.StepScope scope; */
        return new JobCompletionNotificationListener<T>(writer);
    }

    public abstract FieldSetMapper<T> getFieldSetMapper();

    public abstract String getFilesPath();

    public abstract String getLoaderName();

    public abstract String[] getColumnNames();

}

当我使用两个不同的作业参数运行作业的同一个实例时,两个实例将顺序运行,而不是并行运行。我配置了SimpleaySnCTaskExecutorbean,我认为它应该会导致异步触发作业。

我是否需要向该类添加更多的配置以使作业实例并行执行?

共有1个答案

欧阳哲
2023-03-14

您必须将用于启动作业的JobLauncher配置为使用TaskExecutor(或单独的池)。最简单的方法是重写bean:

@Bean
JobLauncher jobLauncher(JobRepository jobRepository) {
    new SimpleJobLauncher(
            taskExecutor: taskExecutor(),
            jobRepository: jobRepository)
}

不要被记录的警告所迷惑,警告说将使用同步任务执行器。这是由于Spring Batch在配置它在SimpleBatchConfiguration中提供的bean时使用了非常笨拙的方式而创建了一个额外的实例(长话短说,如果您想摆脱警告,您需要提供一个BatchConfigurerbean,并指定如何创建其他4个bean,即使您只想更改一个)。

请注意,这是相同的工作在这里是不相关的。问题是,默认情况下,作业启动程序将在同一线程上启动作业。

 类似资料:
  • 假设我在Hadoop环境中资源有限,我不想安排长时间运行的作业(即需要几天时间才能完成)。我正在分析大量过去的时间序列数据。我想安排一次需要一天数据的mapreduce作业(这需要一个小时来处理)。 那么,我如何安排,使新的工作提交后,前一个工作完成?

  • 我对TestNG注释的操作顺序有一个疑问。。。我有以下代码: 我的输出如下: 我的问题是,为什么@AfterTest方法没有在每个@Test注释之后运行?TestNG是否将整个类视为“Test”?似乎是这样,因为@BeforeTest和@AfterTest在@BeforeClass和@AfterClass之外,但我想确保我理解。我假设我可以在这种情况下使用@BeforeMethod和@AfterM

  • 我正在尝试对作业使用Spring批处理。我有两个作业tempJob和tempJob2在两个单独的配置中。当尝试使用命令行参数(-dspring.batch.job.names=tempJob)运行tempJob时,SpringBatch尝试运行tempJob两次,我得到以下错误 2018-06-15 11:36:37.956信息14436---[main]O.S.B.C.L.Support.Sim

  • 我正在用Spring Batch开发使用Spring Boot。 我使用了Spring Boot提供的最小配置,并定义了一些作业(根本没有XML配置)。但当我运行应用程序时, 作业按任意顺序依次执行。 我在注释类中以这种方式定义作业,其余的由Spring完成: 编辑:这个警告能给出一个提示吗?(也许没什么可做的)

  • 我有以下触发器配置: 我的工作可能超过5秒。 可行吗? 谢谢

  • 我正在使用Spring Boot+Spring Batch(注释),遇到了一个我必须运行2个作业的场景。 我有员工和工资记录,需要使用spring批处理更新。我已经按照本教程spring-batch入门教程为Employee和Salary对象配置了