当前位置: 首页 > 知识库问答 >
问题:

Spring批处理Itemreader将同一对象传递给Itemprocessor两次

嵇弘新
2023-03-14

我有带隔板的Spring批。gridSize是10,因此它将生成10个线程。一切都是默认的Bean单例。TaskExeutor最多有15个核心池10个。

@Bean
@StepScope
public RepositoryItemReader<CustomObject> reader(${executorContext[from]} from, ${executorContext[to], ${executorContext[partitonId]) {
    LOG.info("Partition ID: {} will process row from: {}  to: {}", partitionId, from, to);
    //here has the right output, say 1 to 10, 10 to 20, include from, exclude to
    RepositoryItemReader reader = new RepositoryItemReader();
    reader.setRepository(objectRepo);
    reader.setMethod("findByProcessedFromAndTo");
    //from here I pass in from and to to do the partition
    //omit sorts, pageSize,  params
    reader.setSaveState(false);
    return reader;
}

这是读卡器,这个读卡器将返回数据库中的4行。自定义对象1到4。

@Bean
public class processor implements ItemProcessor() {
   @override
   public Object process(customObject) {
       logger.info(customObject.getId());
       //logic
   }
}

@Bean
Step processStep(){
    //chunk 1
    //item reader
    //item processor
    //item writer
    //build
}

Step partitionStep {
    //partion with gridSize 10,
    //processStep
    //taskExecutor
}

Partition  {
    int start = 1;
    int range = totalCount/gridSize + 1;
    for(i to gridSize){
        ExecutionContext context = New ExecutionContext();
       context.put("from",start);
       context.put("to", start*range);
       start += range;
       context.put("partitionId", i); 
       map.put(PARTITION_KEY, context);      
    }
    return map;
}

示例查询:

select * from Table where rownum >=:from and rownum < :to;

设置非常简单。只是一个批处理过程,分区网格大小为10。

当我运行它时,Item reader会得到4条正确的记录。但当读卡器将数据传递给项目处理器时,我得到了这样的日志,我在编数字。

Thread 2 processing Object Id: 11 //row 10 to 20
Thread 1 processing Object Id: 1 //row  1 to 10
Thread 4 processing Object Id: 31 //row  30 to 40
Thread 6 processing Object Id: 51 //row  50 to 60

因为我实现分区并在查询中执行分区。现在所有线程都应该处理分区集,不应该处理重复的记录,但我仍然有同样的问题,一些线程将处理重复的记录。

Thread 9 processing Object ID:2
Thread 3 processing Object ID:4
and so on

当整个工作完成时,数据库中会有未处理的记录。

我错过了什么,需要帮助吗。

共有1个答案

皮承基
2023-03-14

预期的行为是ItemReader应该只将这4个数据传递给4个线程

这是多线程步骤所期望的,而不是分区步骤。

对于分区步骤,设置网格大小是不够的。您需要使用分区器预先定义分区,并将其设置在主步骤上,以便每个工作步骤处理一个分区。如果不指定分区器,Spring Batch无法知道如何对项目进行分区,并且您的读卡器将用于所有(未定义的)分区,多次读取整个数据集。

 类似资料:
  • 我有一个包含数据(案例)记录的CSV文件,为此我创建了一个对象,但是的某些属性必须是文件名中包含的数据的字段(每个文件都必须有一个包含一些数据的严格结构的名称)。 我想要实现的是将文件名中包含的数据传递给项目处理器,在这里,我将在传递给项目编写器之前将该数据追加到每个中。 当我从控制器启动作业时,是否有人知道如何将数据传递给步骤? 提前道谢。

  • 我正在为一个应用程序使用Apache骆驼和Spring启动。我需要从目录中读取,然后散集xml读取,然后处理未编组的对象以在其中设置更多数据,然后再次列表并将其发送到不同的文件夹。我使用以下路由。请告诉我在解组到处理器后如何发送POJO。目前默认情况下,它是进入处理器的交换。

  • 在我的spring批处理代码中,我正在读取一个csv文件,将其处理成一个bean,并尝试用Writer中的值获取该bean。但我无法在Writer中找到豆子。是包含csv文件中的值的bean,我正试图在Writer中获取这些值。我还将bean初始化为一个公共变量,希望它能由读取器、处理器和写入器处理。甚至在之前使用了和注释。请帮我解决这个问题。下面是我的代码,

  • 我能用一个ItemReader和多个子步骤编写一个Spring批处理步骤吗?每个步骤后面都有一个ItemProcessor和一个ItemWriter? 我正在努力实现这样的目标: 补充说明 为了避免前后矛盾,我宁愿不读两遍 我认为这个问题与Spring Batch不同:一个阅读器、多个处理器和编写器,因为我需要按顺序处理项目,而不是并行处理。

  • 我的服务是开始Spring批工作。我希望能够传递一些对象给作业,每次这个对象参数都会不同。这个对象我需要在我的任务中使用。我开始工作的JobLauncher。据我谷歌,我看到JobParameters不会帮助我在这种情况下。此外,我发现很多答案是使用JobExecttionContext或任何东西。但是我想在作业开始前注入参数对象。它是可拥有的吗? 开始工作的服务 我的小任务

  • 问题内容: 我在php中有对象,每个对象代表一个“项目”以及与之相关的所有信息。 当用户浏览页面时,这些对象应传递给javascript。理想情况下,镜像相同的结构,因此我可以使用Raphael在我的网站上将每个项目及其信息显示为单独的形状。 但是,如何将对象从php转换为javascript? 问题答案: 您可以将PHP对象转换为数组,然后使用JSON函数对其进行编码。之后,从JavaScrip