Spring Batch与Elastic-Job综合运用

麹承
2023-12-01

简介

这里将Spring Batch与Elastic-Job进行融合,至于Spring Batch与Elastic-Job可以参考之前的文章作为基础,融合重点是在基于分片时的业务分配

开始构建

  • 添加依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <parent>
   	<groupId>org.springframework.boot</groupId>
   	<artifactId>spring-boot-starter-parent</artifactId>
   	<version>1.5.12.RELEASE</version>
   	<relativePath/> <!-- lookup parent from repository -->
   </parent>
   <groupId>com.gs</groupId>
   <artifactId>spring-job</artifactId>
   <version>0.0.1-SNAPSHOT</version>
   <name>spring-job</name>
   <description>Demo project for Spring Boot</description>

   <properties>
   	<java.version>1.8</java.version>
   </properties>

   <dependencies>
   	<!-- 引入elastic-job-lite核心模块 -->
   	<dependency>
   		<groupId>com.dangdang</groupId>
   		<artifactId>elastic-job-lite-core</artifactId>
   		<version>2.1.5</version>
   	</dependency>
   	<!-- 使用springframework自定义命名空间时引入 -->
   	<dependency>
   		<groupId>com.dangdang</groupId>
   		<artifactId>elastic-job-lite-spring</artifactId>
   		<version>2.1.5</version>
   	</dependency>

   	<dependency>
   		<groupId>org.springframework.boot</groupId>
   		<artifactId>spring-boot-starter-data-jpa</artifactId>
   	</dependency>
   	<dependency>
   		<groupId>org.springframework.boot</groupId>
   		<artifactId>spring-boot-starter-web</artifactId>
   	</dependency>
   	<dependency>
   		<groupId>com.alibaba</groupId>
   		<artifactId>druid-spring-boot-starter</artifactId>
   		<version>1.1.2</version>
   	</dependency>
   	<dependency>
   		<groupId>org.projectlombok</groupId>
   		<artifactId>lombok</artifactId>
   	</dependency>
   	<dependency>
   		<groupId>org.springframework.boot</groupId>
   		<artifactId>spring-boot-starter-test</artifactId>
   		<scope>test</scope>
   	</dependency>

   	<dependency>
   		<groupId>mysql</groupId>
   		<artifactId>mysql-connector-java</artifactId>
   		<version>8.0.11</version>
   	</dependency>
   	<dependency>
   		<groupId>org.springframework.boot</groupId>
   		<artifactId>spring-boot-starter-jdbc</artifactId>
   	</dependency>

   			<dependency>
                   <groupId>com.dangdang</groupId>
                   <artifactId>elastic-job-lite-lifecycle</artifactId>
                   <version>2.1.5</version>
               </dependency>
   	<dependency>
   		<groupId>org.springframework.boot</groupId>
   		<artifactId>spring-boot-starter-batch</artifactId>
   	</dependency>
   	<dependency>
   		<groupId>com.alibaba</groupId>
   		<artifactId>fastjson</artifactId>
   		<version>1.2.39</version>
   	</dependency>
   </dependencies>

   <build>
   	<plugins>
   		<plugin>
   			<groupId>org.springframework.boot</groupId>
   			<artifactId>spring-boot-maven-plugin</artifactId>
   		</plugin>
   	</plugins>
   </build>

</project>

  • 配置文件
server.port=9087
# zookeeper集群
elaticjob.zookeeper.server-lists=127.0.0.1:2181
elaticjob.zookeeper.namespace=my-project

# 主要是为了存储任务执行的日志
spring.datasource.druid.us
spring.datasource.druid.log.url=jdbc:mysql://127.0.0.1:3306/gsong?characterEncoding=utf-8&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=GMT%2B8
spring.datasource.druid.log.username=root
spring.datasource.druid.log.password=123456
spring.datasource.druid.log.driver-class-name=com.mysql.cj.jdbc.Driver
#spring.datasource.druid.log.maxActive=16
#  自动创建更新验证数据库结构
spring.jpa.hibernate.ddl-auto=update
spring.jpa.database=mysql
spring.jpa.show-sql=true
#色之后将不会自启动job
spring.batch.job.enabled=false

  • batch相关代码

reader代码

  @Bean("reader")
    public  FlatFileItemReader<User> reader()throws Exception {
        FlatFileItemReader<User> reader = new FlatFileItemReader<>();
        reader.setEncoding("GBK");
        reader.setLineMapper(new LineMapper<User>() {
            @Override
            public User mapLine(String s, int i) throws Exception {
                if(s==null||"".equals(s)){
                    return new User();
                }
                List<String> collect = Arrays.stream(s.split(" ")).filter(a -> !a.trim().equals("")).collect(Collectors.toList());
                String s1 =collect.size()>=1? collect.get(0):" ";
                String s2 =collect.size()>=2? collect.get(1):" ";
                String s3 =collect.size()>=3? collect.get(2):" ";
                User user =new User(){{
                    setName(s1);
                    setPassword(s2);
                    setEmail(s3);
                }};
                return user;
            }
        });

        return reader;
    }
  @Bean("listResourceMq")
  public  MultiResourceItemReader<User> listResourceMq(@Qualifier("reader") FlatFileItemReader<User> reader ){
      MultiResourceItemReader<User> multiResourceItemReader =new MultiResourceItemReader<User>();
      multiResourceItemReader.setDelegate(reader);
      List<Resource> list =new ArrayList<>();
      for (int a =1;a<51;a++) {
          if(a%2==1){
              if(a==35) continue;
              list.add(new ClassPathResource("tianya_" + a + ".txt"));
          }

      }
      int start=list.size();
      Object[] obj= list.toArray();
      Resource[] qiye=new Resource[start];
      for(int i=0;i<obj.length;i++){
          qiye[i]=(Resource) obj[i];
      }

      multiResourceItemReader.setResources(qiye);
      return multiResourceItemReader;
  }

processor代码实现

    @Bean
    public ItemProcessor<User,User> processor(){
        //使用我们自定义的ItemProcessor的实现CsvItemProcessor
        CsvItemProcessor processor = new CsvItemProcessor();
        //为processor指定校验器为CsvBeanValidator()
        processor.setValidator(csvBeanValidator());
        return processor;
    }

writer代码实现

    @Bean
    public ItemWriter<User> writer(){
        JdbcBatchItemWriter<User> writer = new JdbcBatchItemWriter<>();
        //我们使用JDBC批处理的JdbcBatchItemWriter来写数据到数据库
        writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
        String sql = "insert into user3 "+" (gamename,name,email) "
                +" values(:name,:password,:email)";
        //在此设置要执行批处理的SQL语句
        writer.setSql(sql);
        writer.setDataSource(dataSource);
        return writer;
    }

配置batch step

 @Bean("step1")
    public Step step1(StepBuilderFactory stepBuilderFactory, @Qualifier("listResourceMq") MultiResourceItemReader<User> listResourceMq, ItemWriter<User> writer,
                      ItemProcessor<User,User> processor){
        return stepBuilderFactory
                .get(step1)
                .<User, User>chunk(65000)//批处理每次提交65000条数据
                .reader(listResourceMq)//给step绑定reader
                .faultTolerant()
                .skip(NullPointerException.class)
                .skip(Exception.class)
                .skipLimit(10000)
                .processor(processor)//给step绑定processor
                .writer(writer)//给step绑定writer
                .taskExecutor(new SimpleAsyncTaskExecutor())
                .throttleLimit(16)
                .build();
    }

配置batch job

    @Bean("importJob")
    public Job importJob(JobBuilderFactory jobBuilderFactory, @Qualifier("step1") Step s1){
        return jobBuilderFactory.get("importJob")
                .incrementer(new RunIdIncrementer())
                .flow(s1)//为Job指定Step
                .end()
                .listener(csvJobListener())//绑定监听器csvJobListener
                .build();
    }
  • Elastic-job相关代码

job配置

    public SpringJobScheduler instanceSimpleJob() throws IOException {
        // 定义作业核心配置
        JobCoreConfiguration simpleCoreConfig =
                JobCoreConfiguration
                        .newBuilder("demoSimpleJob", "0 */3 * * * ?", 2)
                        .shardingItemParameters("0=A,1=B")
                        .build();
        // 定义SIMPLE类型配置
        SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, SimpleJobImpl.class.getCanonicalName());
        // 定义Lite作业根配置
        LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();
        //jobEventConfiguration,事件追踪功能将job执行信息写入数据库
        return new SpringJobScheduler(simpleJob, zookeeperRegistryCenter, simpleJobRootConfig,customerMoreJobListener);
    }

> job实现类,这里是连接springbatch的核心

/**
 * @auther 高松
 * @DATE 2019/1/5  23:01
 * spring-job
 */
@Component
public class SimpleJobImpl implements SimpleJob
{
    @Autowired
    JobLauncher jobLauncher;
    @Autowired
    @Qualifier("importJob")
    Job importJob;
    @Autowired
    @Qualifier("importJob1")
    Job importJob1;
    private static volatile  boolean flag=false;
    @Override
    public void execute(ShardingContext shardingContext) {
        switch (shardingContext.getShardingItem()){
            case 0:
                try {
                    if(!flag){
                        flag=true;
                        System.out.println("执行了A");
                        //这里根据分片来执行不同的batch job 也就是自己的业务
  /*                      JobParameters  jobParameters = new JobParametersBuilder()
                                .addDate("start",new Date())
                                .toJobParameters();
                        jobLauncher.run(importJob1,jobParameters);*/
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
                break;
            case 1:
                if(!flag) {
                try {
                    flag=true;
                    System.out.println("执行了B");
                        //这里根据分片来执行不同的batch job 也就是自己的业务
/*
                    JobParameters jobParameters = new JobParametersBuilder()
                                .addDate("start", new Date())
                                .toJobParameters();
                        jobLauncher.run(importJob, jobParameters);*/
                } catch (Exception e) {
                    e.printStackTrace();
                }
                }
                break;
            default:
                System.out.println("尚未分片!");
                break;

        }
    }
}

源码地址:链接:https://pan.baidu.com/s/1RYlQulxyyw2P5iNbtMaN_A
提取码:2ay1

 类似资料: