spring-batch - 从csv读取数据放入数据库

艾昊明
2023-12-01

 spring-batch 应用:从csv读取数据写入到数据库

 

一、spring配置文件

 

applicationContext.xml

 

 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:p="http://www.springframework.org/schema/p" 
	xmlns:mvc="http://www.springframework.org/schema/mvc"
	xsi:schemaLocation="http://www.springframework.org/schema/beans 
	   		http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
	   		http://www.springframework.org/schema/context
	   		http://www.springframework.org/schema/context/spring-context-3.1.xsd
			http://www.springframework.org/schema/mvc 
			http://www.springframework.org/schema/mvc/spring-mvc-3.1.xsd">
	<!-- 读取properties文件 -->
	<context:property-placeholder properties-ref="springProperties" />
	<bean id="springProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean"
		  p:location="classpath:spring.properties" />
	<!-- 扫描@Required、@Autowired,、 @PreDestroy、@Resource 等注解,不会扫描@Transactional-->
	<!--<context:annotation-config />-->

	<!-- 扫描@Component, @Repository,@Service,@Controller注解的bean,实例化成spring的bean。已经实现了annotation-config的功能 -->
	<context:component-scan base-package="com.rollingstone.physician.ranker" />

	<import resource="spring-data.xml"/>
	<import resource="spring-batch.xml"/>
	<import resource="physician-spring-batch-job-.xml"/>
</beans>
 

 

    spring-data.xml

 

 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" 
       	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       	xmlns:p="http://www.springframework.org/schema/p" 
       	xmlns:tx="http://www.springframework.org/schema/tx"
       	xmlns:context="http://www.springframework.org/schema/context"
       	xmlns:jdbc="http://www.springframework.org/schema/jdbc"
       	xmlns:jpa="http://www.springframework.org/schema/data/jpa"
       	xmlns:util="http://www.springframework.org/schema/util"
       	xsi:schemaLocation="
			http://www.springframework.org/schema/beans 
			http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
			http://www.springframework.org/schema/tx 
			http://www.springframework.org/schema/tx/spring-tx-3.1.xsd
			http://www.springframework.org/schema/context
			http://www.springframework.org/schema/context/spring-context-3.1.xsd
			http://www.springframework.org/schema/jdbc 
			http://www.springframework.org/schema/jdbc/spring-jdbc-3.1.xsd
			http://www.springframework.org/schema/util 
			http://www.springframework.org/schema/util/spring-util-3.1.xsd">
	<!-- 读取poperties文件 -->
	<context:property-placeholder properties-ref="springProperties" />
	<!-- 使注解@Transactional生效-->
	<tx:annotation-driven transaction-manager="transactionManager" />
	<!-- 定义数据库连接-->
	<bean id="dataSource" class="com.mchange.v2.c3p0.ComboPooledDataSource"
		destroy-method="close"
		p:driverClass="${app.jdbc.driverClassName}"
		p:jdbcUrl="${app.jdbc.url}"
		p:user="${app.jdbc.username}"
		p:password="${app.jdbc.password}"
		p:acquireIncrement="5"
		p:idleConnectionTestPeriod="60"
		p:maxPoolSize="100"
		p:maxStatements="50"
		p:minPoolSize="10" />
		
	<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"
		p:dataSource-ref="dataSource" />

	<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"
       			p:dataSource-ref="dataSource" />
       			
     <bean id="physicianDao" class="com.rollingstone.physician.ranker.spring.dao.PhysicianDao">
     		<property name="dataSource" ref="dataSource" />
    </bean>
	<!-- 系统启动时,将执行如下sql-->
	<jdbc:initialize-database data-source="dataSource">
		<jdbc:script location="classpath*:/org/springframework/batch/core/schema-drop-hsqldb.sql" />
		<jdbc:script location="classpath*:/org/springframework/batch/core/schema-hsqldb.sql" />
		<jdbc:script location="classpath:PhysicianData.sql" />
	</jdbc:initialize-database>
</beans>
 

 

    spring-batch.xml

 

 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" 
       	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       	xmlns:p="http://www.springframework.org/schema/p" 
       	xmlns:tx="http://www.springframework.org/schema/tx"
       	xmlns:context="http://www.springframework.org/schema/context"
       	xmlns:jdbc="http://www.springframework.org/schema/jdbc"
       	xmlns:util="http://www.springframework.org/schema/util"
       	xmlns:aop="http://www.springframework.org/schema/aop"
       	xmlns:batch="http://www.springframework.org/schema/batch"
       	xsi:schemaLocation="
			http://www.springframework.org/schema/beans 
			http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
			http://www.springframework.org/schema/tx 
			http://www.springframework.org/schema/tx/spring-tx-3.1.xsd
			http://www.springframework.org/schema/context
			http://www.springframework.org/schema/context/spring-context-3.1.xsd
			http://www.springframework.org/schema/jdbc 
			http://www.springframework.org/schema/jdbc/spring-jdbc-3.1.xsd
			http://www.springframework.org/schema/util 
			http://www.springframework.org/schema/util/spring-util-3.1.xsd
			http://www.springframework.org/schema/aop 
			http://www.springframework.org/schema/aop/spring-aop-3.1.xsd
			http://www.springframework.org/schema/batch 
			http://www.springframework.org/schema/batch/spring-batch.xsd">
	<!-- 创建jobLauncher,用来运行job -->
	<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"
		p:jobRepository-ref="jobRepository" 
		p:taskExecutor-ref="taskExecutor"/>
	
	<!-- 创建线程池 -->
	<bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
	
	<!-- 创建jobRepository,用于存储job信息 -->
	<batch:job-repository id="jobRepository"
	    data-source="dataSource"
	    isolation-level-for-create="DEFAULT"
	    transaction-manager="transactionManager"/>
</beans>
 

 

   physician-spring-batch-job-.xml

 

 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" 
       	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       	xmlns:p="http://www.springframework.org/schema/p" 
       	xmlns:tx="http://www.springframework.org/schema/tx"
       	xmlns:context="http://www.springframework.org/schema/context"
       	xmlns:jdbc="http://www.springframework.org/schema/jdbc"
       	xmlns:util="http://www.springframework.org/schema/util"
       	xmlns:aop="http://www.springframework.org/schema/aop"
       	xmlns:batch="http://www.springframework.org/schema/batch"
       	xsi:schemaLocation="
			http://www.springframework.org/schema/beans 
			http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
			http://www.springframework.org/schema/tx 
			http://www.springframework.org/schema/tx/spring-tx-3.1.xsd
			http://www.springframework.org/schema/context
			http://www.springframework.org/schema/context/spring-context-3.1.xsd
			http://www.springframework.org/schema/jdbc 
			http://www.springframework.org/schema/jdbc/spring-jdbc-3.1.xsd
			http://www.springframework.org/schema/util 
			http://www.springframework.org/schema/util/spring-util-3.1.xsd
			http://www.springframework.org/schema/aop 
			http://www.springframework.org/schema/aop/spring-aop-3.1.xsd
			http://www.springframework.org/schema/batch 
			http://www.springframework.org/schema/batch/spring-batch.xsd">

	<batch:job id="blackListJob" job-repository="jobRepository">
		<batch:step id="blackListLoad">
			<tasklet>
				<!--commit-interval 多少条进行一次commit-->
				<chunk reader="blackListFileItemReader" writer="blackListItemWriter" processor="blackListItemProcessor"
					   commit-interval="${job.commit.interval}"
					   skip-limit="${job.skip_limit}"
					   retry-limit="3">
	            
					<skippable-exception-classes>
						<include class="org.springframework.batch.item.file.FlatFileParseException" />
						<exclude class="java.io.FileNotFoundException"/>
					</skippable-exception-classes>
					<retryable-exception-classes>
						<include class="org.springframework.dao.DeadlockLoserDataAccessException"/>
						<exclude class="java.io.FileNotFoundException"/>
					</retryable-exception-classes>
					<no-rollback-exception-classes>
						<include class="org.springframework.batch.item.validator.ValidationException"/>
					</no-rollback-exception-classes>
	           </chunk>
			</tasklet>
		</batch:step>
		<batch:listeners>
			<batch:listener ref="jobExecutionListenerDemo" />
			<batch:listener ref="stepExecutionListenerDemo" />
		</batch:listeners>

		<batch:validator ref="jobParametersValidatorDemo" />
	</batch:job>

	<bean id="blackListFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader">
		<property name="resource" value="classpath:csv/BlackListFie.csv" />
		<property name="linesToSkip" value="1"/>
		<property name="lineMapper">
			<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
				<property name="lineTokenizer">
					<bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
						<property name="names" value="NAME,CARD_TYPE,CARD_NUM" />
					</bean>
				</property>
				<property name="fieldSetMapper">
					<bean class="com.joandora.spring.batch.spring.batch.BlackListFieldSetMapper" />
				</property>
			</bean>
		</property>
	</bean>
</beans>
 

 

 

二、主程序入口

 

 

/***
 * 启动类<br>
 * 从文件中读取数据 保存到数据库中
 */
public class Startup {
	public static void main(String[] args) throws Exception {
		ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");

		BlackListLoaderJob blackListLoaderTask = (BlackListLoaderJob) context.getBean("blackListLoaderJob");
		blackListLoaderTask.loadBlackList();
	}
}
 

 

 blackListLoaderJob定义在physician-spring-batch-job-.xml中定义的一个任务。

 使用如下标签定义:

 

<batch:job id="blackListLoaderJob" job-repository="jobRepository"> </batch:job>
 

 

 

一个job可以定义多个step,可以理解成一个任务分为多少个步骤:

 

 

<batch:step id="blackListLoad">
</atch:step>
 

 

   在step里面定义一个tasklet:

 

<tasklet>
	<!--commit-interval 多少条进行一次commit-->
	<chunk reader="blackListFileItemReader" writer="blackListItemWriter" processor="blackListItemProcessor"
					commit-interval="${job.commit.interval}"
					skip-limit="${job.skip_limit}"
					retry-limit="3">
	            
		<skippable-exception-classes>
			<include class="org.springframework.batch.item.file.FlatFileParseException" />
			<exclude class="java.io.FileNotFoundException"/>
		</skippable-exception-classes>
		<retryable-exception-classes>
			<include class="org.springframework.dao.DeadlockLoserDataAccessException"/>
			<exclude class="java.io.FileNotFoundException"/>
		</retryable-exception-classes>
		<no-rollback-exception-classes>
			<include class="org.springframework.batch.item.validator.ValidationException"/>
		</no-rollback-exception-classes>
	</chunk>
</tasklet>
 

 

  chunk的reader属性从文件读取数据,processor是将读取数据进行自定义处理,write将处理过的数据写入数据库。

  commit-interval属性是一次事务提交时处理的记录数。skip-limit属性是允许跳过的最大记录数。retry-limit属性是允许重试的最大记录数。

  skippable-exception-classes属性可以定义报某种错时就跳过该条记录的处理。

  retryable-exception-classes属性可以定义报某种错时就重试该条记录。

  no-rollback-exception-classes属性可以定义报某种错时事务不会回滚。

 

三、读取csv文件 

 

<bean id="blackListFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader">
		<property name="resource" value="classpath:csv/BlackListFie.csv" />
		<property name="linesToSkip" value="1"/>
		<property name="lineMapper">
			<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
				<property name="lineTokenizer">
					<bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
						<property name="names" value="NAME,CARD_TYPE,CARD_NUM" />
					</bean>
				</property>
				<property name="fieldSetMapper">
					<bean class="com.joandora.spring.batch.spring.batch.BlackListFieldSetMapper" />
				</property>
			</bean>
		</property>
	</bean>

 

   属性解释:

   resource:csv文件所在路径

   linesToSkip:csv文件读取数据时,跳过的起始行数

   

四、处理从csv文件读取的每条数据

@Component
public class BlackListItemProcessor implements ItemProcessor<BlackListDomain,BlackListDomain> {
    @Override
    public BlackListDomain process(BlackListDomain item) throws Exception {
        return item;
    }
}

 

 

五、将处理过的数据写入数据库

@Component
public class BlackListItemWriter implements ItemWriter<BlackListDomain> {

    private static final Logger LOG = LoggerFactory.getLogger(BlackListItemWriter.class);

    @Resource
    private IBlackListDao physicianDao;

    @Override
    public void write(List<? extends BlackListDomain> blackListDomains) {
    try {
        for (BlackListDomain blackListDomain : blackListDomains) {
              physicianDao.loadBlackList(blackListDomain);
        }
        physicianDao.queryBlackList();
    } catch (Exception ple) {
        LOG.debug(ple.getMessage());
    }
  }
}

 

 

六、BlackListLoaderJob 

@Component
public class BlackListLoaderJob {

    private static final Logger LOG = LoggerFactory.getLogger(BlackListLoaderJob.class);

    private JobLauncher jobLauncher;

    private Job blackListJob;

    public Job getPhysicianJob() {
        return blackListJob;
    }

    @Autowired
    @Qualifier("blackListJob")
    public void setPhysicianJob(Job physicianJob) {
        this.blackListJob = physicianJob;
    }

    public JobLauncher getJobLauncher() {
        return jobLauncher;
    }

    @Autowired
    @Required
    public void setJobLauncher(JobLauncher jobLauncher) {
        this.jobLauncher = jobLauncher;
    }

    public void loadBlackList() {
        LOG.info("Entered sendEvents");
        try {
            Map<String, JobParameter> parameters = new HashMap<String, JobParameter>();
            parameters.put("date", new JobParameter(new Date()));
            jobLauncher.run(blackListJob, new JobParameters(parameters));

        } catch (JobInstanceAlreadyCompleteException ex) {
            LOG.debug("This job has been completed already!");
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        System.out.println("over----------------");
    }
}

 

 

 七、pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>spring-batch-demo</artifactId>
        <groupId>com.joandora</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>spring-batch-writers</artifactId>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spring.core.version>4.2.0.RELEASE</spring.core.version>
        <spring.data.jpa.version>1.7.1.RELEASE</spring.data.jpa.version>
        <spring.batch.version>3.0.7.RELEASE</spring.batch.version>
        <cglib.version>2.2</cglib.version>
        <aspectj.version>1.8.2</aspectj.version>
        <c3p0.version>0.9.1.2</c3p0.version>
        <querydsl.version>2.2.5</querydsl.version>
        <slf4j.version>1.7.13</slf4j.version>
        <log4j.version>1.2.17</log4j.version>
        <!-- Testing -->
        <junit.version>4.12</junit.version>
        <!-- Plugins -->
        <maven.copy.plugin.version>0.2.3</maven.copy.plugin.version>
        <maven.compiler.plugin.version>2.3.2</maven.compiler.plugin.version>
        <maven.apt.plugin.version>1.0</maven.apt.plugin.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${spring.core.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context-support</artifactId>
            <version>${spring.core.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>${spring.core.version}</version>
            <type>jar</type>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-tx</artifactId>
            <version>${spring.core.version}</version>
            <type>jar</type>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jdbc</artifactId>
            <version>${spring.core.version}</version>
            <type>jar</type>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-oxm</artifactId>
            <version>${spring.core.version}</version>
            <type>jar</type>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-aspects</artifactId>
            <version>${spring.core.version}</version>
            <type>jar</type>
            <scope>compile</scope>
        </dependency>

        <!-- Spring Batch -->
        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-core</artifactId>
            <version>${spring.batch.version}</version>
        </dependency>

        <!-- A seamless aspect-oriented extension to the Java programming language -->
        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjrt</artifactId>
            <version>${aspectj.version}</version>
        </dependency>

        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjweaver</artifactId>
            <version>${aspectj.version}</version>
        </dependency>

        <!-- Cglib is a powerful, high performance and quality Code Generation Library,
        It is used to extend JAVA classes and implements interfaces at runtime.  -->
        <dependency>
            <groupId>cglib</groupId>
            <artifactId>cglib-nodep</artifactId>
            <version>${cglib.version}</version>
            <type>jar</type>
            <scope>compile</scope>
        </dependency>

        <!-- Logger -->
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version>
            <type>jar</type>
            <scope>compile</scope>
        </dependency>

        <!-- The Simple Logging Facade for Java or (SLF4J) serves as a simple facade or abstraction
        for various logging frameworks, e.g. java.util.logging, log4j and logback, allowing the end
        user to plug in the desired logging framework at deployment time. -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
            <type>jar</type>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
            <type>jar</type>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>jcl-over-slf4j</artifactId>
            <version>${slf4j.version}</version>
            <type>jar</type>
            <scope>compile</scope>
        </dependency>

        <!-- Spring Data JPA -->
        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-jpa</artifactId>
            <version>${spring.data.jpa.version}</version>
        </dependency>

        <!-- Database pooling -->
        <dependency>
            <groupId>c3p0</groupId>
            <artifactId>c3p0</artifactId>
            <version>${c3p0.version}</version>
            <type>jar</type>
            <scope>compile</scope>
        </dependency>

        <!-- Testing dependencies -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
            <type>jar</type>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>${spring.core.version}</version>
            <type>jar</type>
            <scope>test</scope>
        </dependency>
        <!-- HSQLDB -->
        <dependency>
            <groupId>org.hsqldb</groupId>
            <artifactId>hsqldb</artifactId>
            <version>2.3.2</version>
        </dependency>

    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                </configuration>
            </plugin>

            <plugin>
                <artifactId>maven-jar-plugin</artifactId>
                <version>2.4</version>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
        </plugins>

    </build>
</project>

 

 

 

 类似资料: