当前位置: 首页 > 知识库问答 >
问题:

Spring批处理分区-所有线程处理相同的记录

沈开畅
2023-03-14

我正在spring Boot中使用异步任务执行器对数百万条记录的数据进行分区,块大小为1000条,网格大小为10条。为了从数据库中获取特定的分区数据,我正在使用项目读取器的before步骤中的StepExecution获取分区数据的开始和结束索引(来自Partitioner类)。

例如:项目阅读器

beforeStep(StepExecution execution){
int startIndex = execution.getExecutionContext().getInt("startIndexValue")
int endIndex = execution.getExecutionContext().getInt("endIndexValue")
List testDataList = getTestDatabetween(startIndex, endIndex);
}

Item Reader遍历testData列表并将testData值返回给writer

testData read()
{
if(!testData.isEmpty()){
testData = testDataList.get(testIndex);
testIndex++;
}
return testData;
}

TestData=Partition1、Partition2、Partition3

只有最后一个分区即Partition3被读取、处理和写入。

我希望所有分区同时读取。

共有1个答案

西门展
2023-03-14

这很可能是阅读器的线程安全问题。测试这一点的一种方法是将gridsize减少到1,看看它是否处理。

请确保它位于scope=“step”中,以确保每个分区都有一个新实例。

<bean id="yourReader" class="com.example.reader.YourReader" scope="step">
    <!-- properties -->
</bean>
 类似资料:
  • 我正在使用spring批处理使用RepositoryItemReader从postgresql DB读取记录,然后将其写入主题。我看到大约有100万条记录需要处理,但它并没有处理所有的记录。我已经将reader的pageSize设置为10,000并且与提交间隔(块大小)相同

  • 通过定义节流限制来解决:这里定义的相同:Spring batch Multithreading:节流限制影响 我注意到当我用20k条记录运行批处理时,一些线程已经开始处理,但在10个请求后就停止了。但是,其他线程正在正常处理。你能建议一下问题是什么吗?如果我保持corepoolsize=threadpoolsize=5,那么所有的线程都是正确分布的。

  • 我有一个要求,我想在下面的场景中使用Spring批处理框架。 我有一个在交易日期列上分区的表。我想使用Spring批处理框架的阅读器、处理器和写入器来处理该表的记录。我想做的是根据交易日期创建单独的线程进行读取、写入和处理。假设有4个交易日期,那么我想为单独的交易日期创建4个单独的线程。在每个线程中,阅读器将从该交易日期的表中读取记录,在处理器中丰富记录,然后在写入器中发布/写入。 我是Sprin

  • 我正在尝试使用多个处理器类在处理器步骤中处理记录。这些类可以并行工作。目前我已经编写了一个多线程步骤,其中我 设置处理器类的输入和输出行 提交给遗嘱执行人服务 获取所有未来对象并收集最终输出

  • null 我更新了我的步骤并添加了一个ThreadPoolTaskExecutor,如下所示 在此之后,我的处理器将被多个线程调用,但使用相同的源数据。我还有什么需要做的吗?

  • 我使用的是Spring Batch 2.1.8。释放我有一个文件,它由一些头信息和一些需要处理的记录组成。 我有一个使用面向块处理的步骤。该步骤包含ItemReader和ItemWriter的实现。ItemReader实现是线程安全的,而ItemWriter不是。 我想在处理(或写入)任何记录之前使用标题信息。在继续使用面向块的处理时,如何确保这一点? 建议的解决方案:一种解决方案可以是编写一个预