当前位置: 首页 > 知识库问答 >
问题:

分区Spring批处理步骤重复相同成功的从步骤执行

朱通
2023-03-14

使用Spring Batch 3.0.4.Release。

我将作业配置为使用分区步骤。从机步骤使用块大小1。任务执行器中有六个线程。我使用从六到数百的各种网格大小来运行这个测试。我的网格大小是从StepExecutions的数量,我希望==我的分区器创建的ExecutionContexts的数量。

下面是Java配置代码:

@Bean
@StepScope
public RapRequestItemReader rapReader(
        @Value("#{stepExecutionContext['" + RapJobConfig.LIST_OF_IDS_STEP_EXECUTION_CONTEXT_VAR + "']}") String listOfIds,
        final @Value("#{stepExecutionContext['" + RapJobConfig.TIME_STEP_EXECUTION_CONTEXT_VAR + "']}") String timeString) {
    final List<Asset> farms = Arrays.asList(listOfIds.split(",")).stream().map(intString -> assetDao.getById(Integer.valueOf(intString)))
            .collect(Collectors.toList());
    return new RapRequestItemReader(timeString, farms);
}

@Bean
public ItemProcessor<RapRequest, PullSuccess> rapProcessor() {
    return rapRequest -> {
        return rapPull.pull(rapRequest.timestamp, rapRequest.farms);
    };
}

@Bean
public TaskletStep rapStep1(StepBuilderFactory stepBuilderFactory, RapRequestItemReader rapReader) {
    return stepBuilderFactory.get(RAP_STEP_NAME)
            .<RapRequest, PullSuccess> chunk(RAP_STEP_CHUNK_SIZE)
            .reader(rapReader)
            .processor(rapProcessor())
            .writer(updateCoverageWriter)
            .build();
}

private RapFilePartitioner createRapFilePartitioner(RapParameter rapParameter) {
    RapFilePartitioner partitioner = new RapFilePartitioner(rapParameter, rapPull.getIncrementHours());
    return partitioner;
}

@Bean
public ThreadPoolTaskExecutor pullExecutor() {
    ThreadPoolTaskExecutor pullExecutor = new ThreadPoolTaskExecutor();
    pullExecutor.setCorePoolSize(weatherConfig.getNumberOfThreadsPerModelType());
    pullExecutor.setMaxPoolSize(weatherConfig.getNumberOfThreadsPerModelType());
    pullExecutor.setAllowCoreThreadTimeOut(true);
    return pullExecutor;
}

@Bean
@JobScope
public Step rapPartitionByTimestampStep(StepBuilderFactory stepBuilderFactory, @Value("#{jobParameters['config']}") String config,
        TaskletStep rapStep1) {
    RapParameter rapParameter = GsonHelper.fromJson(config, RapParameter.class);
    int gridSize = calculateGridSize(rapParameter);
    return stepBuilderFactory.get("rapPartitionByTimestampStep")
            .partitioner(rapStep1)
            .partitioner(RAP_STEP_NAME, createRapFilePartitioner(rapParameter))
            .taskExecutor(pullExecutor())
            .gridSize(gridSize)
            .build();
}

@Bean
public Job rapJob(JobBuilderFactory jobBuilderFactory, Step rapPartitionByTimestampStep) {
    return jobBuilderFactory.get(JOB_NAME)
            .start(rapPartitionByTimestampStep)
            .build();
}

共有1个答案

谷星文
2023-03-14

虽然很难从问题中分辨出来,但问题出在阅读器上。ItemReader从未返回Null。

在设计中,StepExecution应该只处理一个项目。但是,在处理该项之后,ItemReader将再次返回相同的项,而不是返回NULL。

我通过让ItemReader在第二次调用read时返回null来修复它。

更好的设计可能是使用TaskletStep而不是ChunkStep。

 类似资料:
  • 给定一个使用分区的Spring批处理作业,是否可能有多个分区步骤? 例如: 在上述示例中,是否可以将另一个添加到(最好不需要为每个分区步骤提供分区器)?如果没有,是否有其他方法来配置多个步骤,这些步骤将针对每个分区逐个执行?

  • 我正在尝试修复Spring Batch中的一个问题,这个问题最近一直困扰着我们的系统。我们有一份工作,在大多数情况下都很好。下载和处理数据是一个多步骤的工作。 问题是有时工作会爆棚。也许我们试图连接到的服务器抛出了错误,或者我们在工作进行到一半时关闭了服务器。此时,下次我们的quartz调度程序尝试运行该作业时,它似乎什么也不做。以下是此作业定义的删节版本: 委婉地说,我是Spring Batch

  • 我有一个spring批处理作业,预计将根据FIFO顺序处理'N'个作业ID。这个Spring批处理作业有5个步骤。 我们使用DECIDER来确定是否有更多的job-id。如果是,请转到第一步并运行该job-id的所有步骤。 我在spring-batch发出的日志中看到“duplicate step”消息,在第一个作业中的步骤(例如job-id=1)获得未知状态之前,该消息似乎没有问题。在这种情况下

  • 我试图配置我的第一个多线程作业。我们有大约200,000条记录的主目录,我们需要处理。我想将文件分解为10个文件并处理它们。拆分文件tasklet工作正常 主步骤在我的配置中运行,但从步骤不运行。下面是我的配置。 分割者: MultiResourceItemReader: FlatFileItemWriter: 作业配置: 从属步骤配置: 请告知我做错了什么。我没有看到处理器urlFileItem

  • 我正在尝试为分区配置Spring批处理步骤。这里很好的示例显示了一个关于“ID范围”的分区,但我不知道如何从“数据页”范围开始。 在我的顺序步骤中,我有: null

  • 我正在寻找一些关于测试Spring批处理步骤和步骤执行的一般性意见和建议。 我的基本步骤是从api读入数据,处理实体对象,然后写入数据库。我已经测试了快乐之路,这一步成功地完成了。我现在想做的是在处理器阶段数据丢失时测试异常处理。我可以单独测试processor类,但我更愿意测试整个步骤,以确保在步骤/作业级别正确反映流程故障。 我已经阅读了spring批量测试指南,如果我是诚实的,我对它有点迷茫