spring-batch批处理框架介绍及demo

葛兴发
2023-12-01

最近在项目中,有批处理的相关需求。最终选定技术框架为spring-batch批处理框架,在此专栏中为大家分享spring-batch的基础知识与项目遇到的一些实际问题的解决方案。

 1.框架介绍

spring-batch主要分为job和step。step中有包含reader、process、writer等主要功能。step还可设置任务并行、串行与多线程执行。整个框架包含了常用的大量监听器监听每一个细分步骤的执行。以及批处理过程中常用的重试机制。

spring-batch拥有很强大的文件读取器,平面文件、数据库、html等文件都能进行读取。并且支持自定义的读取器。同时写入器也支持平面文件及数据库等各种类型的数据写入。

2.适用场景

spring-batch适用于数据同步、数据整合统计、数据迁移等场景

主要开发步骤为:数据获取(读)->数据处理->数据写入(写)

3.一个简单的spring-batch demo。采用spring-boot+spring-batch+mysql。

依赖引入可以参考官网教程

下面主要展示yml配置和基本代码

yml配置spring-batch及mysql数据库

spring:
  batch:
    job:
    # 默认自动执行定义的Job(true),改为false,需要jobLaucher.run执行
    enabled: false
    # spring batch在数据库里面创建默认的数据表,如果不是always则会提示相关表不存在
    initialize-schema: always
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/job_batch?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&useSSL=false&serverTimezone=UTC
    username: root
    password: 123456
    hikari:
      connection-test-query: select 1

准备需要读取的平面文件

userId,userName
201907190921,周一
201907190922,赵二
201907190923,张三
201907190924,李四
201907190925,王五
201907190926,闫六
201907190927,刘七
201907190928,胡八

编写reader

package com.flight.neon.batch.demo.job.reder;

import com.flight.neon.batch.demo.job.entity.User;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ClassPathResource;
import org.springframework.stereotype.Component;

/**
 * @author 这个码农不太萌
 */
@Component
public class DemoReader {

    @Bean("userReader")
    public FlatFileItemReader<User> userReader(){
        ClassPathResource classPathResource = new ClassPathResource("text/user.txt");
        FlatFileItemReader<User> flatFileItemReader = new FlatFileItemReader<>();
        flatFileItemReader.setResource(classPathResource);
        //设置跳过行数
        flatFileItemReader.setLinesToSkip(1);
        //数据转换
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames(new String[]{"userId","userName"});
        DefaultLineMapper<User> defaultLineMapper = new DefaultLineMapper<>();
        defaultLineMapper.setLineTokenizer(tokenizer);
        defaultLineMapper.setFieldSetMapper(fieldSet -> {
            User user = new User();
            user.setUserId(fieldSet.readString("userId"));
            user.setUserName(fieldSet.readString("userName"));
            return user;
        });
        defaultLineMapper.afterPropertiesSet();
        flatFileItemReader.setLineMapper(defaultLineMapper);
        return flatFileItemReader;
    }

}

编写process

package com.flight.neon.batch.demo.job.processor;

import com.flight.neon.batch.demo.job.entity.User;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

/**
 * @author 这个码农不太萌
 */
@Component
public class DemoProcess {

    @Bean("orgInfoProcess")
    public ItemProcessor<User, User> userProcess() {
        ItemProcessor<User, User> itemProcessor = ehrOrg -> {
            //数据处理
            return ehrOrg;
        };
        return itemProcessor;
    }

}

编写writer

package com.flight.neon.batch.demo.job.writer;

import com.flight.neon.batch.demo.job.entity.User;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.sql.DataSource;

/**
 * @author 这个码农不太萌
 */
@Configuration
public class DemoWriter {

    @Autowired
    DataSource dataSource;

    @Bean("userWriter")
    public JdbcBatchItemWriter<User> userWriter() {
        JdbcBatchItemWriter<User> writer = new JdbcBatchItemWriter<>();
        //设置数据源
        writer.setDataSource(dataSource);
        //设置sql
        writer.setSql("insert into user (user_id,user_name) values (:userId,:userName)");
        writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
        return writer;
    }

}

编写step及job

package com.flight.neon.batch.demo.job;

import com.flight.neon.batch.demo.job.entity.User;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

/**
 * @author 这个码农不太萌
 */
@Component
public class DemoJob {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    @Qualifier("userReader")
    FlatFileItemReader<User> userReader;

    @Autowired
    @Qualifier("orgInfoProcess")
    ItemProcessor<User, User> orgInfoProcess;

    @Autowired
    @Qualifier("userWriter")
    JdbcBatchItemWriter<User> userWriter;

    @Bean
    public Job userJob() {
        return jobBuilderFactory.get("userJob")
                .start(userStep())
                .build();
    }

    public Step userStep() {
        return stepBuilderFactory.get("userStep")
                .<User,User>chunk(100)
                .reader(userReader)
                .processor(orgInfoProcess)
                .writer(userWriter)
                .build();
    }

}

编写单元测试

package com.flight.neon.batch.demo.test;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class DemoJobTest {

    @Autowired
    SimpleJobLauncher jobLauncher;

    @Autowired
    Job userJob;

    @Test
    public void demoTest() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
        JobParameters jobParameters = new JobParametersBuilder().addLong("time", System.currentTimeMillis())
                .toJobParameters();
        jobLauncher.run(userJob, jobParameters);
    }

}

运行后,可在数据库中看到录入数据

 类似资料: