spring batch 项目中多个job配置

孟洋
2023-12-01

spb 跑批量,会有多个job的场景,

并且需要根据实际选择指定的job 比如 java -java xxx.jar -参数,根据参数的不同来执行不同的job。

配置job

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.*;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
 
import java.util.List;
import java.util.Map;
 
@Configuration
@EnableBatchProcessing
public class JobOneConfiguration {
 
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
 
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
 
    //job
    @Bean
    public Job jobOne() {
        return jobBuilderFactory.get("jobOne")
                .incrementer(new RunIdIncrementer())
                .start(step1())
                .build();
    }
 
    @Bean
    public JdbcCursorItemReader<Map<String, String>> reader() {
        JdbcCursorItemReader<Map<String, String>> reader = new JdbcCursorItemReader<>();
        //reader逻辑根据实际需要写,先不举例
        return reader;
    }
 
    public ItemProcessor<Map<String, String>, String> tableProcessor() {
        return new ItemProcessor<Map<String, String>, String>() {
            @Override
            public String process(Map<String, String> map) throws Exception {
                String insertSql = "";
                //process逻辑根据实际需要写,先不举例
                return insertSql;
            }
        };
    }
 
    @Bean
    public ItemWriter<String> writer() {
        return new ItemWriter<String>() {
            @Override
            public void write(List<? extends String> list) throws Exception {
                //writer逻辑根据实际需要写,先不举例
            }
        };
    }
 
    //step
    @Bean
    @JobScope
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .<Map<String, String>, String>chunk(100)
                .reader(reader())
                .processor(tableProcessor())
                .writer(writer())
                .build();
    }
}

第二个job

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.*;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
 
import java.util.List;
import java.util.Map;
 
@Configuration
@EnableBatchProcessing
public class JobTwoConfiguration {
 
    @Autowired
    private JobBuilderFactory job;
 
    @Autowired
    private StepBuilderFactory step;
 
    //job
    @Bean
    public Job jobTwo() {
        return job.get("jobTwo")
                .incrementer(new RunIdIncrementer())
                .start(step1())
                .build();
    }
 
    @Bean
    public JdbcCursorItemReader<Map<String, String>> reader2() {
        JdbcCursorItemReader<Map<String, String>> reader = new JdbcCursorItemReader<>();
        //reader逻辑根据实际需要写,先不举例
        return reader;
    }
 
    public ItemProcessor<Map<String, String>, String> tableProcessor2() {
        return new ItemProcessor<Map<String, String>, String>() {
            @Override
            public String process(Map<String, String> map) throws Exception {
                String insertSql = "";
                //process逻辑根据实际需要写,先不举例
                return insertSql;
            }
        };
    }
 
    @Bean
    public ItemWriter<String> writer2() {
        return new ItemWriter<String>() {
            @Override
            public void write(List<? extends String> list) throws Exception {
                //writer逻辑根据实际需要写,先不举例
            }
        };
    }
 
    //step
    @Bean
    @JobScope
    public Step step2() {
        return step.get("step2")
                .<Map<String, String>, String>chunk(100)
                .reader(reader2())
                .processor(tableProcessor2())
                .writer(writer2())
                .build();
    }
}

1. job封装到这个类,提供get方方法

提供get方法 ctx.getBean()来选择

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.support.ApplicationContextFactory;
import org.springframework.batch.core.configuration.support.GenericApplicationContextFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
@Configuration
@EnableBatchProcessing(modular=true)
public class JobConfiguration {
 
    @Bean
    public ApplicationContextFactory oneJob(){
        return new GenericApplicationContextFactory(JobOneConfiguration.class);
    }
 
    @Bean
    public ApplicationContextFactory twoJob(){
        return new GenericApplicationContextFactory(JobTwoConfiguration.class);
    }
}

main函数代码(args判断选择哪个job执行)

import org.springframework.batch.core.*;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.BeansException;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.ConfigurableApplicationContext;
 
import java.io.IOException;
import java.util.Date;
 
@SpringBootApplication
@ComponentScan
@EnableAutoConfiguration
public class BatchApplication {
 
    public static void main(String[] args) throws BeansException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException, InterruptedException, IOException {
        boolean oneJob = false;
        boolean twoJob = false;
 
        SpringApplication application = new SpringApplication(BatchApplication.class);
        application.setWebEnvironment(false);
        ConfigurableApplicationContext ctx = application.run(args);
        JobLauncher jobLauncher = ctx.getBean(JobLauncher.class);
 
        //无参数时,都执行
        if (args.length == 0) {
            oneJob = true;
            twoJob = true;
        } else {
            for (String arg : args) {
                if (arg.toLowerCase().equals("?")) {
                    System.out.println("Usage : Batch [-b] [-r]");
                }
                if (arg.toLowerCase().equals("-o") || arg == null) {
                    oneJob = true;
                }
                if (arg.toLowerCase().equals("-t")) {
                    twoJob = true;
                }
            }
        }
        if (oneJob) {
            JobParameters jobParameters = new JobParametersBuilder()
                    .addDate("date", new Date())
                    .toJobParameters();
            jobLauncher.run(ctx.getBean("jobOne", Job.class), jobParameters)
           //jobOne名称必须和JobOneConfiguration中配置的@bean Job 的方法名一致,后面jobTwo也是一样。
        }
        if (twoJob) {
            JobParameters jobParameters = new JobParametersBuilder()
                    .addDate("date", new Date())          
                    .toJobParameters();
            jobLauncher.run(ctx.getBean("jobTwo", Job.class), jobParameters);
        }
 
        System.exit(0);
 
    }
}

以上代码完成之后,发现启动时,不管参数是-o还是-t,都会把两个job执行,这个因为spb在加载时默认都会执行;

spring.batch.job.enabled=false

设置为spb加载后不可以自动执行;

 类似资料: