使用Spring batch 2.2.1,我配置了Spring batch作业,我使用了以下方法:
配置如下所示:
>
Tasklet使用ThreadPoolTaskExecutor,限制为15个线程
油门限制等于线程数
块用于:
>
1个JdbcCursorItemReader的同步适配器,以允许它被许多线程使用,如Spring Batch留档命令
您可以同步对read()的调用,只要处理和写入是块中最昂贵的部分,您的步骤仍然可以比单线程配置更快地完成。
SaveState在JdbcCursorItemReader上为false
基于JPA的自定义ItemWriter。请注意,它对一个项目的处理可能会在流转时长方面有所不同,它可能需要几毫到几秒钟(
提交间隔设置为1(我知道这可能更好,但这不是问题所在)
所有的jdbc池都很好,关于Spring Batch doc的恢复
由于以下原因,运行批处理会导致非常奇怪和糟糕的结果:
查看Spring Batch代码,根本原因似乎在这个包中:
这种工作方式是功能还是限制/错误?
如果这是一个特性,那么通过配置,如何使所有线程都不会因长时间的处理工作而变得匮乏,而不必重写所有内容?
请注意,如果所有项目占用相同的时间,则一切正常,多线程也可以,但如果其中一个项目处理需要更多的时间,那么多线程在缓慢的过程中几乎是无用的。
注:我打开了这个问题:
在我的例子中,如果我没有设置限制,那么ItemReader的read()方法中只有4个线程,这也是默认的线程数,如果没有按照Spring批处理文档在tasklet标记中指定的话。
如果我指定更多线程,例如10、20或100,那么ItemReader的read()方法中只有8个线程
以下是我认为正在发生的事情:
换句话说,为了让Spring批处理中的多线程方法有所帮助,每个线程需要在大约相同的时间内处理。考虑到某些项目的处理时间之间存在巨大差异的场景,您正在经历一个限制,其中许多线程都已完成,并等待一个长时间运行的同级线程才能进入下一个处理块。
我的建议是:
正如Alex所说,根据javadocs,这种行为似乎是一种契约:
子类只需要提供一个获取下一个结果*的方法,以及一个等待所有结果从并发*进程或线程返回的方法
看看:
任务执行者重复模板#等待结果
另一个选择是使用分区:
Michael Minella在他的Pro Spring Batch一书的第11章中解释了这一点:
<batch:job id="batchWithPartition">
<batch:step id="step1.master">
<batch:partition partitioner="myPartitioner" handler="partitionHandler"/>
</batch:step>
</batch:job>
<!-- This one will create Paritions of Number of lines/ Grid Size-->
<bean id="myPartitioner" class="....ColumnRangePartitioner"/>
<!-- This one will handle every partition in a Thread -->
<bean id="partitionHandler" class="org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler">
<property name="taskExecutor" ref="multiThreadedTaskExecutor"/>
<property name="step" ref="step1" />
<property name="gridSize" value="10" />
</bean>
<batch:step id="step1">
<batch:tasklet transaction-manager="transactionManager">
<batch:chunk reader="myItemReader"
writer="manipulatableWriterForTests" commit-interval="1"
skip-limit="30000">
<batch:skippable-exception-classes>
<batch:include class="java.lang.Exception" />
</batch:skippable-exception-classes>
</batch:chunk>
</batch:tasklet>
</batch:step>
<!-- scope step is critical here-->
<bean id="myItemReader"
class="org.springframework.batch.item.database.JdbcCursorItemReader" scope="step">
<property name="dataSource" ref="dataSource"/>
<property name="sql">
<value>
<![CDATA[
select * from customers where id >= ? and id <= ?
]]>
</value>
</property>
<property name="preparedStatementSetter">
<bean class="org.springframework.batch.core.resource.ListPreparedStatementSetter">
<property name="parameters">
<list>
<!-- minValue and maxValue are filled in by Partitioner for each Partition in an ExecutionContext-->
<value>{stepExecutionContext[minValue]}</value>
<value>#{stepExecutionContext[maxValue]}</value>
</list>
</property>
</bean>
</property>
<property name="rowMapper" ref="customerRowMapper"/>
</bean>
Partitioner.java:
package ...;
import java.util.HashMap;
import java.util.Map;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
public class ColumnRangePartitioner implements Partitioner {
private String column;
private String table;
public Map<String, ExecutionContext> partition(int gridSize) {
int min = queryForInt("SELECT MIN(" + column + ") from " + table);
int max = queryForInt("SELECT MAX(" + column + ") from " + table);
int targetSize = (max - min) / gridSize;
System.out.println("Our partition size will be " + targetSize);
System.out.println("We will have " + gridSize + " partitions");
Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();
int number = 0;
int start = min;
int end = start + targetSize - 1;
while (start <= max) {
ExecutionContext value = new ExecutionContext();
result.put("partition" + number, value);
if (end >= max) {
end = max;
}
value.putInt("minValue", start);
value.putInt("maxValue", end);
System.out.println("minValue = " + start);
System.out.println("maxValue = " + end);
start += targetSize;
end += targetSize;
number++;
}
System.out.println("We are returning " + result.size() + " partitions");
return result;
}
public void setColumn(String column) {
this.column = column;
}
public void setTable(String table) {
this.table = table;
}
}
我在没有ItemWriter的情况下定义了我的tasklet,如下所示: 我得到了这个错误: 配置问题:
我目前正在构建一个spring批处理应用程序,其中执行了几个步骤。除了一个,所有的步骤都是简单的tasklet(没有读取器或写入器),它们负责各种任务,如复制文件、发送请求、启动批处理(*.bat)文件等。 大多数步骤应该是串行执行的。在一个特定的步骤中,我希望启动X文件,这些文件最多可以有Y个实例。 null 如果:)我想我必须使用taskExecutor,下面我有一个示例,在这里我开始第一步(
我正在尝试使用多个处理器类在处理器步骤中处理记录。这些类可以并行工作。目前我已经编写了一个多线程步骤,其中我 设置处理器类的输入和输出行 提交给遗嘱执行人服务 获取所有未来对象并收集最终输出
关于spring batch tasklet与Task-Executor的步骤,我遇到了一个奇怪的问题。配置是正常和简单的,只是一个tasklet(不是面向块的),如下所示: someBean是一个实例实现的Tasklet接口。stange的问题是,当我启动作业时,execute方法调用了两次: 实际上,创建了两个线程,并执行了两次该逻辑。如果将task-executor更改为普通的(org.sp
第2步。根据在步骤1中创建的对象列表中有多少项创建步骤列表。 第三步。尝试执行步骤2中创建的步骤列表中的步骤。 下面在executeDynamicStepsTasklet()中执行x个步骤。虽然代码运行时没有任何错误,但它似乎没有做任何事情。我在那个方法中的内容看起来正确吗?
我有一个多步骤Spring Batch作业,在其中一个步骤中,我为在阅读器中读取的数据创建Lucene索引,以便后续步骤可以在该Lucene索引中搜索。 基于中读取的数据,我将索引分散到几个单独的目录中。 如果我指定Step Task Executor为,只要索引总是写入不同的目录,我就不会遇到任何问题,但有时会出现锁定异常。我猜,两个线程试图写入同一个索引。 如果我删除了,我不会遇到任何问题,但