spring-batch - 从数据库分页读取数据然后输出

蔡楚
2023-12-01

一、pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>spring-batch-demo</artifactId>
        <groupId>com.joandora</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>spring-batch-integration</artifactId>
    <packaging>jar</packaging>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spring.core.version>4.2.0.RELEASE</spring.core.version>
        <spring.data.jpa.version>1.7.1.RELEASE</spring.data.jpa.version>
        <spring.batch.version>3.0.7.RELEASE</spring.batch.version>
        <cglib.version>2.2</cglib.version>
        <aspectj.version>1.8.2</aspectj.version>
        <c3p0.version>0.9.1.2</c3p0.version>
        <querydsl.version>2.2.5</querydsl.version>
        <slf4j.version>1.7.13</slf4j.version>
        <log4j.version>1.2.17</log4j.version>
        <!-- Testing -->
        <junit.version>4.12</junit.version>
        <!-- Plugins -->
        <maven.copy.plugin.version>0.2.3</maven.copy.plugin.version>
        <maven.compiler.plugin.version>2.3.2</maven.compiler.plugin.version>
        <maven.apt.plugin.version>1.0</maven.apt.plugin.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${spring.core.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context-support</artifactId>
            <version>${spring.core.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>${spring.core.version}</version>
            <type>jar</type>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-tx</artifactId>
            <version>${spring.core.version}</version>
            <type>jar</type>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jdbc</artifactId>
            <version>${spring.core.version}</version>
            <type>jar</type>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-oxm</artifactId>
            <version>${spring.core.version}</version>
            <type>jar</type>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-aspects</artifactId>
            <version>${spring.core.version}</version>
            <type>jar</type>
            <scope>compile</scope>
        </dependency>

        <!-- Spring Batch -->
        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-core</artifactId>
            <version>${spring.batch.version}</version>
        </dependency>

        <!-- A seamless aspect-oriented extension to the Java programming language -->
        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjrt</artifactId>
            <version>${aspectj.version}</version>
        </dependency>

        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjweaver</artifactId>
            <version>${aspectj.version}</version>
        </dependency>

        <!-- Cglib is a powerful, high performance and quality Code Generation Library,
        It is used to extend JAVA classes and implements interfaces at runtime.  -->
        <dependency>
            <groupId>cglib</groupId>
            <artifactId>cglib-nodep</artifactId>
            <version>${cglib.version}</version>
            <type>jar</type>
            <scope>compile</scope>
        </dependency>

        <!-- Logger -->
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version>
            <type>jar</type>
            <scope>compile</scope>
        </dependency>

        <!-- The Simple Logging Facade for Java or (SLF4J) serves as a simple facade or abstraction
        for various logging frameworks, e.g. java.util.logging, log4j and logback, allowing the end
        user to plug in the desired logging framework at deployment time. -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
            <type>jar</type>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
            <type>jar</type>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>jcl-over-slf4j</artifactId>
            <version>${slf4j.version}</version>
            <type>jar</type>
            <scope>compile</scope>
        </dependency>

        <!-- Spring Data JPA -->
        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-jpa</artifactId>
            <version>${spring.data.jpa.version}</version>
        </dependency>

        <!-- Database pooling -->
        <dependency>
            <groupId>c3p0</groupId>
            <artifactId>c3p0</artifactId>
            <version>${c3p0.version}</version>
            <type>jar</type>
            <scope>compile</scope>
        </dependency>

        <!-- Testing dependencies -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
            <type>jar</type>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>${spring.core.version}</version>
            <type>jar</type>
            <scope>test</scope>
        </dependency>
        <!-- HSQLDB -->
        <dependency>
            <groupId>org.hsqldb</groupId>
            <artifactId>hsqldb</artifactId>
            <version>2.3.2</version>
        </dependency>

    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                </configuration>
            </plugin>

            <plugin>
                <artifactId>maven-jar-plugin</artifactId>
                <version>2.4</version>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

 

二、spring总配置文件

    applicationContext.xml

 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:p="http://www.springframework.org/schema/p"
       xmlns:mvc="http://www.springframework.org/schema/mvc"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
	   		http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
	   		http://www.springframework.org/schema/context
	   		http://www.springframework.org/schema/context/spring-context-3.1.xsd
			http://www.springframework.org/schema/mvc
			http://www.springframework.org/schema/mvc/spring-mvc-3.1.xsd">
    <!-- 读取properties文件 -->
    <context:property-placeholder properties-ref="springProperties" />
    <bean id="springProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean"
          p:location="classpath:spring.properties" />
    <!-- 扫描@Required、@Autowired,、 @PreDestroy、@Resource 等注解,不会扫描@Transactional-->
    <!--<context:annotation-config />-->

    <!-- 扫描@Component, @Repository,@Service,@Controller注解的bean,实例化成spring的bean。已经实现了annotation-config的功能 -->
    <context:component-scan base-package="com.joandora.spring.batch" />

    <import resource="spring-data.xml"/>
    <import resource="spring-batch.xml"/>
    <import resource="spring-batch-job.xml"/>
</beans>

 

   spring.properties

# database properties
#app.jdbc.driverClassName=com.mysql.jdbc.Driver
app.jdbc.driverClassName=org.hsqldb.jdbcDriver
#app.jdbc.url=jdbc:mysql://localhost/physiandb
app.jdbc.url=jdbc:hsqldb:mem:physiandb;sql.enforce_strict_size=true;hsqldb.tx=mvcc
#app.jdbc.username=root
app.jdbc.username=sa
#app.jdbc.password=root
app.jdbc.password=

# batch properties
job.commit.interval=8
job.skip_limit=10

 

三、数据库配置

 

   spring-data.xml

 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" 
       	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       	xmlns:p="http://www.springframework.org/schema/p" 
       	xmlns:tx="http://www.springframework.org/schema/tx"
       	xmlns:context="http://www.springframework.org/schema/context"
       	xmlns:jdbc="http://www.springframework.org/schema/jdbc"
       	xmlns:jpa="http://www.springframework.org/schema/data/jpa"
       	xmlns:util="http://www.springframework.org/schema/util"
       	xsi:schemaLocation="
			http://www.springframework.org/schema/beans 
			http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
			http://www.springframework.org/schema/tx 
			http://www.springframework.org/schema/tx/spring-tx-3.1.xsd
			http://www.springframework.org/schema/context
			http://www.springframework.org/schema/context/spring-context-3.1.xsd
			http://www.springframework.org/schema/jdbc 
			http://www.springframework.org/schema/jdbc/spring-jdbc-3.1.xsd
			http://www.springframework.org/schema/util 
			http://www.springframework.org/schema/util/spring-util-3.1.xsd">
	<!-- 读取poperties文件 -->
	<context:property-placeholder properties-ref="springProperties" />
	<!-- 使注解@Transactional生效-->
	<tx:annotation-driven transaction-manager="transactionManager" />
	<!-- 定义数据库连接-->
	<bean id="dataSource" class="com.mchange.v2.c3p0.ComboPooledDataSource"
		destroy-method="close"
		p:driverClass="${app.jdbc.driverClassName}"
		p:jdbcUrl="${app.jdbc.url}"
		p:user="${app.jdbc.username}"
		p:password="${app.jdbc.password}"
		p:acquireIncrement="5"
		p:idleConnectionTestPeriod="60"
		p:maxPoolSize="100"
		p:maxStatements="50"
		p:minPoolSize="10" />
		
	<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"
		p:dataSource-ref="dataSource" />

	<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"
       			p:dataSource-ref="dataSource" />
       			

	<!-- 系统启动时,将执行如下sql-->
	<jdbc:initialize-database data-source="dataSource">
		<jdbc:script location="classpath*:/org/springframework/batch/core/schema-drop-hsqldb.sql" />
		<jdbc:script location="classpath*:/org/springframework/batch/core/schema-hsqldb.sql" />
		<jdbc:script location="classpath:blacklist.sql" />
	</jdbc:initialize-database>
</beans>

 

    blacklist.sql,程序启动将执行此脚本

 

 

CREATE TABLE T_BLACKLIST (
NAME varchar(100) NOT NULL PRIMARY KEY
);
insert into T_BLACKLIST values('tb1');
insert into T_BLACKLIST values('tb2');
insert into T_BLACKLIST values('tb3');
insert into T_BLACKLIST values('tb4');
insert into T_BLACKLIST values('tb5');
insert into T_BLACKLIST values('tb6');
insert into T_BLACKLIST values('tb7');
insert into T_BLACKLIST values('tb8');
insert into T_BLACKLIST values('tb9');
insert into T_BLACKLIST values('tb10');
 

 

 四、spring-batch配置

 

   spring-batch.xml

 

 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" 
       	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       	xmlns:p="http://www.springframework.org/schema/p" 
       	xmlns:tx="http://www.springframework.org/schema/tx"
       	xmlns:context="http://www.springframework.org/schema/context"
       	xmlns:jdbc="http://www.springframework.org/schema/jdbc"
       	xmlns:util="http://www.springframework.org/schema/util"
       	xmlns:aop="http://www.springframework.org/schema/aop"
       	xmlns:batch="http://www.springframework.org/schema/batch"
       	xsi:schemaLocation="
			http://www.springframework.org/schema/beans 
			http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
			http://www.springframework.org/schema/tx 
			http://www.springframework.org/schema/tx/spring-tx-3.1.xsd
			http://www.springframework.org/schema/context
			http://www.springframework.org/schema/context/spring-context-3.1.xsd
			http://www.springframework.org/schema/jdbc 
			http://www.springframework.org/schema/jdbc/spring-jdbc-3.1.xsd
			http://www.springframework.org/schema/util 
			http://www.springframework.org/schema/util/spring-util-3.1.xsd
			http://www.springframework.org/schema/aop 
			http://www.springframework.org/schema/aop/spring-aop-3.1.xsd
			http://www.springframework.org/schema/batch 
			http://www.springframework.org/schema/batch/spring-batch.xsd">
	<!-- 创建jobLauncher,用来运行job -->
	<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"
		p:jobRepository-ref="jobRepository" 
		p:taskExecutor-ref="taskExecutor"/>
	
	<!-- 创建线程池 -->
	<bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
	
	<!-- 创建jobRepository,用于存储job信息 -->
	<batch:job-repository id="jobRepository"
	    data-source="dataSource"
	    isolation-level-for-create="DEFAULT"
	    transaction-manager="transactionManager"/>
</beans>
 

 

 

 五、job配置

 

    spring-batch-job.xml

 

 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" 
       	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       	xmlns:p="http://www.springframework.org/schema/p" 
       	xmlns:tx="http://www.springframework.org/schema/tx"
       	xmlns:context="http://www.springframework.org/schema/context"
       	xmlns:jdbc="http://www.springframework.org/schema/jdbc"
       	xmlns:util="http://www.springframework.org/schema/util"
       	xmlns:aop="http://www.springframework.org/schema/aop"
       	xmlns:batch="http://www.springframework.org/schema/batch"
       	xsi:schemaLocation="
			http://www.springframework.org/schema/beans 
			http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
			http://www.springframework.org/schema/tx 
			http://www.springframework.org/schema/tx/spring-tx-3.1.xsd
			http://www.springframework.org/schema/context
			http://www.springframework.org/schema/context/spring-context-3.1.xsd
			http://www.springframework.org/schema/jdbc 
			http://www.springframework.org/schema/jdbc/spring-jdbc-3.1.xsd
			http://www.springframework.org/schema/util 
			http://www.springframework.org/schema/util/spring-util-3.1.xsd
			http://www.springframework.org/schema/aop 
			http://www.springframework.org/schema/aop/spring-aop-3.1.xsd
			http://www.springframework.org/schema/batch 
			http://www.springframework.org/schema/batch/spring-batch.xsd">
	<bean id="taskletExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
		<property name="corePoolSize" value="5"/>
		<property name="maxPoolSize" value="10"/>
		<property name="queueCapacity" value="30"/>
	</bean>
	<batch:job id="blackListJob">
		<batch:step id="addBlackListStep">
			<batch:tasklet task-executor="taskletExecutor">
				<batch:chunk reader="blackListReader" processor="blackListProcessor"
							 writer="blackListWriter" commit-interval="2" />
			</batch:tasklet>
		</batch:step>
	</batch:job>
	<bean id="blackListReader"  scope="step" class="org.springframework.batch.item.database.JdbcPagingItemReader">
		<property name="dataSource" ref="dataSource" />
		<property name="queryProvider">
			<bean
					class="org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean">
				<property name="dataSource" ref="dataSource" />
				<property name="selectClause" value="select NAME" />
				<property name="fromClause" value="from T_BLACKLIST" />
				<property name="whereClause"
						  value="where NAME like :name" />
				<property name="sortKey" value="NAME" />
			</bean>
		</property>
		<property name="parameterValues">
			<map>
				<entry key="name" value="#{jobParameters['name']}" />
			</map>
		</property>
		<!-- 配置limit的大小 -->
		<property name="pageSize" value="2" />
		<property name="rowMapper" ref="blackListRowMapper" />
	</bean>
</beans>
 

 

六、启动类

    com.joandora.spring.batch.server.Startup

 

public class Startup {
	public static void main(String[] args) throws Exception {
		ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");

		BlackListLoader blackListLoaderTask = (BlackListLoader) context.getBean("blackListLoader");
		blackListLoaderTask.loadBlackList();
	}
}
 

 

   启动job的类:com.joandora.spring.batch.loader.BlackListLoader

@Component
public class BlackListLoader {
    private static final Logger LOG = LoggerFactory.getLogger(BlackListLoader.class);
    @Resource
    private JobLauncher jobLauncher;

    @Resource
    private Job blackListJob;

    public void loadBlackList() {
        try {
            JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
            jobParametersBuilder.addString("name", "%tb%");

            jobLauncher.run(blackListJob, jobParametersBuilder.toJobParameters());

        } catch (JobInstanceAlreadyCompleteException ex) {
            LOG.debug("This job has been completed already!");

        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        System.out.println("over----------------");
    }
}

    解析:

    1、定义了一个名称为name的参数,值为%tb%。意思就是替换sql的模糊查询。

 

 七、reader

 

<bean id="blackListReader"  scope="step" class="org.springframework.batch.item.database.JdbcPagingItemReader">
		<property name="dataSource" ref="dataSource" />
		<property name="queryProvider">
			<bean
					class="org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean">
				<property name="dataSource" ref="dataSource" />
				<property name="selectClause" value="select NAME" />
				<property name="fromClause" value="from T_BLACKLIST" />
				<property name="whereClause"
						  value="where NAME like :name" />
				<property name="sortKey" value="NAME" />
			</bean>
		</property>
		<property name="parameterValues">
			<map>
				<entry key="name" value="#{jobParameters['name']}" />
			</map>
		</property>
		<!-- 配置limit的大小 -->
		<property name="pageSize" value="2" />
		<property name="rowMapper" ref="blackListRowMapper" />
	</bean>
   解析:

 

   1、使用spring-batch已实现的分页类:SqlPagingQueryProviderFactoryBean

 2、selectClause、fromClause、fromClause组合成一条sql。

 3、sortKey很重要,如果任务失败,下次启动时不至于处理的记录混乱。

 

 com.joandora.spring.batch.mapper.BlackListRowMapper

 

@Component
public class BlackListRowMapper implements RowMapper<BlackListEntity> {

    public BlackListEntity mapRow(ResultSet rs, int rowNum) throws SQLException {
        BlackListEntity blackList = new BlackListEntity();
        blackList.setName(rs.getString("NAME"));
        return blackList;
    }
}
 

 

八、Processor

   com.joandora.spring.batch.processor.BlackListProcessor

 

@Component
public class BlackListProcessor implements ItemProcessor<BlackListEntity,BlackListEntity> {

    @Override
    public BlackListEntity process(BlackListEntity item) throws Exception {
        item.setName(item.getName()+"-ByProcessor");
        return item;
    }
}
 

 

九、writer

   com.joandora.spring.batch.writer.BlackListWriter

@Component
public class BlackListWriter  implements ItemWriter<BlackListEntity> {

    private static final Logger LOG = LoggerFactory.getLogger(BlackListEntity.class);

    @Override
    public void write(List<? extends BlackListEntity> blackListEntities) {
        try {
            for (BlackListEntity blackList : blackListEntities) {
                System.out.println("spring batch writer show"+blackList.getName());
            }
        } catch (Exception ple) {
            LOG.debug(ple.getMessage());
        }
    }
}

 

 类似资料: