当前位置: 首页 > 知识库问答 >
问题:

如何运行实现spring-batch来逐行处理csv文件?

竺和洽
2023-03-14

任何建议都将不胜感激。谢谢

共有1个答案

常智勇
2023-03-14

下面是一个将以下csv文件处理成bean的示例

headerA,headerB,headerC
col1,col2,col3

第一行(标题)被忽略,其他列直接映射到“匹配”对象中。(这样做只是为了简洁)。

下面是使用Spring Batch开箱即用组件的作业配置;

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:batch="http://www.springframework.org/schema/batch"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd">

    <batch:job id="fileJob">
        <batch:step id="fileJob.step1">
            <batch:tasklet>
                <batch:chunk reader="fileReader" writer="databaseWriter" commit-interval="10000"/>
            </batch:tasklet>
        </batch:step>
        <batch:validator>
            <bean class="org.springframework.batch.core.job.DefaultJobParametersValidator">
                <property name="requiredKeys" value="fileName"/>
            </bean>
        </batch:validator>
    </batch:job>

    <bean id="fileReader"
        class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
        <property name="lineMapper" ref="lineMapper"/>
        <property name="resource" value="file:#{jobParameters['fileName']}"/>
        <property name="linesToSkip" value="1"/>
    </bean>

    <bean id="lineMapper"
        class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
        <property name="fieldSetMapper" ref="fieldSetMapper"/>
        <property name="lineTokenizer" ref="lineTokenizer"/>
    </bean>


    <bean id="lineTokenizer"
        class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
        <property name="delimiter" value=","/>
        <property name="names" value="col1,col2,col3"/>
    </bean>

    <bean id="fieldSetMapper"
        class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
        <property name="targetType" value="de.incompleteco.spring.batch.domain.SimpleEntity"/>
    </bean>

    <bean id="databaseWriter"
        class="org.springframework.batch.item.database.JdbcBatchItemWriter">
        <property name="dataSource" ref="dataSource"/>
        <property name="itemSqlParameterSourceProvider">
            <bean class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider"/>
        </property>
        <property name="sql" value="insert into simple_entity (col1,col2,col3) values (:col1,:col2,:col3)"/>
    </bean>
</beans>
    null
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:batch="http://www.springframework.org/schema/batch"
    xmlns:jdbc="http://www.springframework.org/schema/jdbc"
    xmlns:task="http://www.springframework.org/schema/task"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
        http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd
        http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

    <batch:job-repository id="jobRepository"/>

    <bean id="jobExplorer"
        class="org.springframework.batch.core.explore.support.JobExplorerFactoryBean">
        <property name="dataSource" ref="dataSource"/>
    </bean>
    <bean id="jobLauncher"
        class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
        <property name="jobRepository" ref="jobRepository"/>
        <property name="taskExecutor" ref="taskExecutor"/>
    </bean>

    <beans profile="junit">
        <jdbc:embedded-database id="dataSource" type="H2">
            <jdbc:script location="classpath:/org/springframework/batch/core/schema-h2.sql"/>
            <jdbc:script location="classpath:/META-INF/sql/schema-h2.sql"/>
        </jdbc:embedded-database>

        <task:executor id="taskExecutor"/>

        <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
            <property name="dataSource" ref="dataSource"/>
        </bean>
    </beans>
</beans>
package de.incompleteco.spring.batch;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.io.File;
import java.io.FileOutputStream;

import javax.sql.DataSource;

import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration({"classpath:/META-INF/spring/*-context.xml"})
@ActiveProfiles("junit")
public class FileJobIntegrationTest {

    @Autowired
    private Job job;

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private JobExplorer jobExplorer;

    @Autowired
    private DataSource dataSource;

    private int recordCount = 1000000;

    private String fileName = System.getProperty("java.io.tmpdir") + File.separator + "test.csv";

    @Before
    public void before() throws Exception {
        if (new File(fileName).exists()) {
            new File(fileName).delete();
        }//end if
    }

    @Test
    public void test() throws Exception {
        //create a file
        FileOutputStream fos = new FileOutputStream(fileName);
        fos.write("col1,col2,col3".getBytes());
        fos.flush();
        for (int i=0;i<=recordCount;i++) {
            fos.write(new String(i + "," + (i+1) + "," + (i+2) + "\n").getBytes());
            fos.flush();//flush it
        }//end for
        fos.close();
        //lets get the size of the file
        long length = new File(fileName).length();
        System.out.println("file size: " + ((length / 1024) / 1024));
        //execute the job
        JobParameters jobParameters = new JobParametersBuilder().addString("fileName",fileName).toJobParameters();
        JobExecution execution = jobLauncher.run(job,jobParameters);
        //monitor
        while (jobExplorer.getJobExecution(execution.getId()).isRunning()) {
            Thread.sleep(1000);
        }//end while
        //load again
        execution = jobExplorer.getJobExecution(execution.getId());
        //test
        assertEquals(ExitStatus.COMPLETED.getExitCode(),execution.getExitStatus().getExitCode());
        //lets see what's in the database
        int count = new JdbcTemplate(dataSource).queryForObject("select count(*) from simple_entity", Integer.class);
        //test
        assertTrue(count == recordCount);
    }

}
 类似资料:
  • 我正在使用spring批处理入站文件,下面是我的用例 将收到包含15个CSV格式文件的zip 我需要并行处理它们 在处理完所有文件后,需要进行一些计算并发送报告 有人能建议我如何使用Spring Batch实现这一点吗?

  • 我有一个作业流,我希望以以下方式运行它: 作业流将从Job1开始。在Job1成功完成后,Job1将同时启动Job2和Job4。 Job2和Job4将并行运行。 在Job2成功完成后,Job2将启动Job3。 在Job4成功完成后,Job4将启动Job5。 下面是job1.xml和job1的作业启动器类的代码片段: job1.xml uijobLauncher.java “job2,Job3”对和“

  • 我在生产中有10个大文件,我们需要从文件中读取每一行,将逗号分隔的值转换为某个值对象,并将其发送到JMS队列,还需要插入数据库中的3个不同的表中 如果我们取10个文件,我们将有3300万行。我们使用spring batch(MultiResourceItemReader)读取earch行,并使用write将其写入db,同时将其发送到JMS。它大约需要25个小时才能完成所有。 为了提高性能,我们考虑

  • 本文向大家介绍VBS和bat批处理逐行读取文件实例,包括了VBS和bat批处理逐行读取文件实例的使用技巧和注意事项,需要的朋友参考一下 首先是批处理的,很简单,每隔两秒钟读取一行。 更直观的: 当然如果你想做更多其他的事 do 后面是你发挥的地方 VBS的两个版本 第一种方式,逐行读取,依次显示: 第二种方式,全部读取,依次显示: VBS读取文本最后一行: Const ForReading = 1

  • 问题内容: 我正在编写一个POC来处理大约10亿行以上的超大文本文件,并为此进行了尝试。 但是,当运行此命令时,会出现此错误; 紧急:单个文件或套接字上的并发操作过多(最大1048575) 我还没有在网上找到任何可以解决此特定错误的信息。我不确定这是否是文件描述符问题,错误中列出的最大值远高于我的限制500,000。 做这个的最好方式是什么? 不太明显,它是我在处理数据时将调用的实际功能的替代品。

  • 问题内容: 运行main方法时,将执行作业。这样我无法弄清楚如何控制作业的执行。例如,您如何安排作业,访问作业执行或设置作业参数的方式。 我试图注册自己的JobLauncher 但是当我尝试在主要方法中使用它时: 当加载上下文时,该作业再次执行,而当我尝试手动运行它时,我得到了。有没有办法防止自动作业执行? 问题答案: 通过设置可以防止作业执行 在application.properties中。或