当前位置: 首页 > 知识库问答 >
问题:

Spring批处理:带有多线程执行器的Tasklet在节流算法方面的性能非常差

上官扬
2023-03-14

使用Spring batch 2.2.1,我配置了Spring batch作业,我使用了以下方法:

  • http://static.springsource.org/spring-batch/reference/html/scalability.html#multithreadedStep

配置如下所示:

>

  • Tasklet使用ThreadPoolTaskExecutor,限制为15个线程

    油门限制等于线程数

    块用于:

    >

  • 1个JdbcCursorItemReader的同步适配器,以允许它被许多线程使用,如Spring Batch留档命令

    您可以同步对read()的调用,只要处理和写入是块中最昂贵的部分,您的步骤仍然可以比单线程配置更快地完成。

    SaveState在JdbcCursorItemReader上为false

    基于JPA的自定义ItemWriter。请注意,它对一个项目的处理可能会在流转时长方面有所不同,它可能需要几毫到几秒钟(

    提交间隔设置为1(我知道这可能更好,但这不是问题所在)

    所有的jdbc池都很好,关于Spring Batch doc的恢复

    由于以下原因,运行批处理会导致非常奇怪和糟糕的结果:

    • 在某个步骤中,如果编写器处理这些项目需要一些时间,那么线程池中几乎所有线程都不会执行任何操作,而不是进行处理,只有速度较慢的编写器在工作

    查看Spring Batch代码,根本原因似乎在这个包中:

    • org/spring框架/批处理/重复/支持/

    这种工作方式是功能还是限制/错误?

    如果这是一个特性,那么通过配置,如何使所有线程都不会因长时间的处理工作而变得匮乏,而不必重写所有内容?

    请注意,如果所有项目占用相同的时间,则一切正常,多线程也可以,但如果其中一个项目处理需要更多的时间,那么多线程在缓慢的过程中几乎是无用的。

    注:我打开了这个问题:

    • https://jira.springsource.org/browse/BATCH-2081
  • 共有3个答案

    江文斌
    2023-03-14

    在我的例子中,如果我没有设置限制,那么ItemReader的read()方法中只有4个线程,这也是默认的线程数,如果没有按照Spring批处理文档在tasklet标记中指定的话。

    如果我指定更多线程,例如10、20或100,那么ItemReader的read()方法中只有8个线程

    傅阳
    2023-03-14

    以下是我认为正在发生的事情:

    • 如您所说,您的ThreadPoolTaskExector仅限于15个线程
    • 框架的“块”导致JdbcCursorItemReader中的每个项目(最高线程限制)在不同的线程中执行
    • 但是Spring Batch框架也在等待每个线程(即所有15个)在移动到下一个块之前完成其单独的读/进程/写流,给定您的提交间隔为1。有时,这会导致14个线程在一个兄弟线程上等待近60秒,而这需要很长时间才能完成。

    换句话说,为了让Spring批处理中的多线程方法有所帮助,每个线程需要在大约相同的时间内处理。考虑到某些项目的处理时间之间存在巨大差异的场景,您正在经历一个限制,其中许多线程都已完成,并等待一个长时间运行的同级线程才能进入下一个处理块。

    我的建议是:

    • 一般来说,我想说增加提交间隔会有所帮助,因为它应该允许在提交之间的单个线程中处理多个游标项,即使其中一个线程卡在长时间运行的写入上。但是,如果您不走运,多个长事务可能会在同一个线程中发生并使情况变得更糟(例如,120秒。在一个线程中的提交间隔为2)。
    • 具体来说,我建议将您的线程池大小增加到一个很大的数字,甚至超过您的最大数据库连接2倍或3倍。应该发生的是,即使您的一些线程会阻止尝试获取连接(因为线程池大小很大),您实际上会看到吞吐量的增加,因为您的长时间运行的线程不再阻止其他线程从游标中获取新项目并在此期间继续您的批处理工作(在块开始时,您的挂起线程数量将大大超过您的可用数据库连接数量。因此,当操作系统调度程序激活在获取数据库连接时被阻止并必须停用线程的线程时,它会有点混乱。但是,由于您的大多数线程都会相对较快地完成工作并释放它们的数据库连接,因此您应该会看到,随着许多线程继续获取数据库连接、执行工作、释放数据库连接并允许更多线程执行相同的操作,即使您的长时间运行的线程正在执行它们的操作,总体上您的吞吐量也会提高)。
    伯俊弼
    2023-03-14

    正如Alex所说,根据javadocs,这种行为似乎是一种契约:

    子类只需要提供一个获取下一个结果*的方法,以及一个等待所有结果从并发*进程或线程返回的方法

    看看:

    任务执行者重复模板#等待结果

    另一个选择是使用分区:

    • 一个TaskExecutorParattionHandler,它将执行来自分区ItemReader的项目,见下文
    • 一个给出要由ItemReader处理的范围的分区器实现,请参阅下面的ColumnRangeParater
    • 一个CustomReader,它将使用分区器将填充的内容读取数据,请参阅下面的myItemReader配置

    Michael Minella在他的Pro Spring Batch一书的第11章中解释了这一点:

    <batch:job id="batchWithPartition">
        <batch:step id="step1.master">
            <batch:partition  partitioner="myPartitioner" handler="partitionHandler"/>
        </batch:step>       
    </batch:job>
    <!-- This one will create Paritions of Number of lines/ Grid Size--> 
    <bean id="myPartitioner" class="....ColumnRangePartitioner"/>
    <!-- This one will handle every partition in a Thread -->
    <bean id="partitionHandler" class="org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler">
        <property name="taskExecutor" ref="multiThreadedTaskExecutor"/>
        <property name="step" ref="step1" />
        <property name="gridSize" value="10" />
    </bean>
    <batch:step id="step1">
            <batch:tasklet transaction-manager="transactionManager">
                <batch:chunk reader="myItemReader"
                    writer="manipulatableWriterForTests" commit-interval="1"
                    skip-limit="30000">
                    <batch:skippable-exception-classes>
                        <batch:include class="java.lang.Exception" />
                    </batch:skippable-exception-classes>
                </batch:chunk>
            </batch:tasklet>
    </batch:step>
     <!-- scope step is critical here-->
    <bean id="myItemReader"    
                            class="org.springframework.batch.item.database.JdbcCursorItemReader" scope="step">
        <property name="dataSource" ref="dataSource"/>
        <property name="sql">
            <value>
                <![CDATA[
                    select * from customers where id >= ? and id <=  ?
                ]]>
            </value>
        </property>
        <property name="preparedStatementSetter">
            <bean class="org.springframework.batch.core.resource.ListPreparedStatementSetter">
                <property name="parameters">
                    <list>
     <!-- minValue and maxValue are filled in by Partitioner for each Partition in an ExecutionContext-->
                        <value>{stepExecutionContext[minValue]}</value>
                        <value>#{stepExecutionContext[maxValue]}</value>
                    </list>
                </property>
            </bean>
        </property>
        <property name="rowMapper" ref="customerRowMapper"/>
    </bean>
    

    Partitioner.java:

     package ...;
      import java.util.HashMap;  
     import java.util.Map;
     import org.springframework.batch.core.partition.support.Partitioner;
     import org.springframework.batch.item.ExecutionContext;
     public class ColumnRangePartitioner  implements Partitioner {
     private String column;
     private String table;
     public Map<String, ExecutionContext> partition(int gridSize) {
        int min =  queryForInt("SELECT MIN(" + column + ") from " + table);
        int max = queryForInt("SELECT MAX(" + column + ") from " + table);
        int targetSize = (max - min) / gridSize;
        System.out.println("Our partition size will be " + targetSize);
        System.out.println("We will have " + gridSize + " partitions");
        Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();
        int number = 0;
        int start = min;
        int end = start + targetSize - 1;
        while (start <= max) {
            ExecutionContext value = new ExecutionContext();
            result.put("partition" + number, value);
            if (end >= max) {
                end = max;
            }
            value.putInt("minValue", start);
            value.putInt("maxValue", end);
            System.out.println("minValue = " + start);
            System.out.println("maxValue = " + end);
            start += targetSize;
            end += targetSize;
            number++;
        }
        System.out.println("We are returning " + result.size() + " partitions");
        return result;
    }
    public void setColumn(String column) {
        this.column = column;
    }
    public void setTable(String table) {
        this.table = table;
    }
    }
    
     类似资料:
    • 我在没有ItemWriter的情况下定义了我的tasklet,如下所示: 我得到了这个错误: 配置问题:

    • 我目前正在构建一个spring批处理应用程序,其中执行了几个步骤。除了一个,所有的步骤都是简单的tasklet(没有读取器或写入器),它们负责各种任务,如复制文件、发送请求、启动批处理(*.bat)文件等。 大多数步骤应该是串行执行的。在一个特定的步骤中,我希望启动X文件,这些文件最多可以有Y个实例。 null 如果:)我想我必须使用taskExecutor,下面我有一个示例,在这里我开始第一步(

    • 我正在尝试使用多个处理器类在处理器步骤中处理记录。这些类可以并行工作。目前我已经编写了一个多线程步骤,其中我 设置处理器类的输入和输出行 提交给遗嘱执行人服务 获取所有未来对象并收集最终输出

    • 关于spring batch tasklet与Task-Executor的步骤,我遇到了一个奇怪的问题。配置是正常和简单的,只是一个tasklet(不是面向块的),如下所示: someBean是一个实例实现的Tasklet接口。stange的问题是,当我启动作业时,execute方法调用了两次: 实际上,创建了两个线程,并执行了两次该逻辑。如果将task-executor更改为普通的(org.sp

    • 第2步。根据在步骤1中创建的对象列表中有多少项创建步骤列表。 第三步。尝试执行步骤2中创建的步骤列表中的步骤。 下面在executeDynamicStepsTasklet()中执行x个步骤。虽然代码运行时没有任何错误,但它似乎没有做任何事情。我在那个方法中的内容看起来正确吗?

    • 我有一个多步骤Spring Batch作业,在其中一个步骤中,我为在阅读器中读取的数据创建Lucene索引,以便后续步骤可以在该Lucene索引中搜索。 基于中读取的数据,我将索引分散到几个单独的目录中。 如果我指定Step Task Executor为,只要索引总是写入不同的目录,我就不会遇到任何问题,但有时会出现锁定异常。我猜,两个线程试图写入同一个索引。 如果我删除了,我不会遇到任何问题,但