使用Spring Batch 3.0.4.Release。
我将作业配置为使用分区步骤。从机步骤使用块大小1。任务执行器中有六个线程。我使用从六到数百的各种网格大小来运行这个测试。我的网格大小是从StepExecutions的数量,我希望==我的分区器创建的ExecutionContexts的数量。
下面是Java配置代码:
@Bean
@StepScope
public RapRequestItemReader rapReader(
@Value("#{stepExecutionContext['" + RapJobConfig.LIST_OF_IDS_STEP_EXECUTION_CONTEXT_VAR + "']}") String listOfIds,
final @Value("#{stepExecutionContext['" + RapJobConfig.TIME_STEP_EXECUTION_CONTEXT_VAR + "']}") String timeString) {
final List<Asset> farms = Arrays.asList(listOfIds.split(",")).stream().map(intString -> assetDao.getById(Integer.valueOf(intString)))
.collect(Collectors.toList());
return new RapRequestItemReader(timeString, farms);
}
@Bean
public ItemProcessor<RapRequest, PullSuccess> rapProcessor() {
return rapRequest -> {
return rapPull.pull(rapRequest.timestamp, rapRequest.farms);
};
}
@Bean
public TaskletStep rapStep1(StepBuilderFactory stepBuilderFactory, RapRequestItemReader rapReader) {
return stepBuilderFactory.get(RAP_STEP_NAME)
.<RapRequest, PullSuccess> chunk(RAP_STEP_CHUNK_SIZE)
.reader(rapReader)
.processor(rapProcessor())
.writer(updateCoverageWriter)
.build();
}
private RapFilePartitioner createRapFilePartitioner(RapParameter rapParameter) {
RapFilePartitioner partitioner = new RapFilePartitioner(rapParameter, rapPull.getIncrementHours());
return partitioner;
}
@Bean
public ThreadPoolTaskExecutor pullExecutor() {
ThreadPoolTaskExecutor pullExecutor = new ThreadPoolTaskExecutor();
pullExecutor.setCorePoolSize(weatherConfig.getNumberOfThreadsPerModelType());
pullExecutor.setMaxPoolSize(weatherConfig.getNumberOfThreadsPerModelType());
pullExecutor.setAllowCoreThreadTimeOut(true);
return pullExecutor;
}
@Bean
@JobScope
public Step rapPartitionByTimestampStep(StepBuilderFactory stepBuilderFactory, @Value("#{jobParameters['config']}") String config,
TaskletStep rapStep1) {
RapParameter rapParameter = GsonHelper.fromJson(config, RapParameter.class);
int gridSize = calculateGridSize(rapParameter);
return stepBuilderFactory.get("rapPartitionByTimestampStep")
.partitioner(rapStep1)
.partitioner(RAP_STEP_NAME, createRapFilePartitioner(rapParameter))
.taskExecutor(pullExecutor())
.gridSize(gridSize)
.build();
}
@Bean
public Job rapJob(JobBuilderFactory jobBuilderFactory, Step rapPartitionByTimestampStep) {
return jobBuilderFactory.get(JOB_NAME)
.start(rapPartitionByTimestampStep)
.build();
}
虽然很难从问题中分辨出来,但问题出在阅读器上。ItemReader从未返回Null。
在设计中,StepExecution应该只处理一个项目。但是,在处理该项之后,ItemReader将再次返回相同的项,而不是返回NULL。
我通过让ItemReader在第二次调用read时返回null来修复它。
更好的设计可能是使用TaskletStep而不是ChunkStep。
给定一个使用分区的Spring批处理作业,是否可能有多个分区步骤? 例如: 在上述示例中,是否可以将另一个添加到(最好不需要为每个分区步骤提供分区器)?如果没有,是否有其他方法来配置多个步骤,这些步骤将针对每个分区逐个执行?
我正在尝试修复Spring Batch中的一个问题,这个问题最近一直困扰着我们的系统。我们有一份工作,在大多数情况下都很好。下载和处理数据是一个多步骤的工作。 问题是有时工作会爆棚。也许我们试图连接到的服务器抛出了错误,或者我们在工作进行到一半时关闭了服务器。此时,下次我们的quartz调度程序尝试运行该作业时,它似乎什么也不做。以下是此作业定义的删节版本: 委婉地说,我是Spring Batch
我有一个spring批处理作业,预计将根据FIFO顺序处理'N'个作业ID。这个Spring批处理作业有5个步骤。 我们使用DECIDER来确定是否有更多的job-id。如果是,请转到第一步并运行该job-id的所有步骤。 我在spring-batch发出的日志中看到“duplicate step”消息,在第一个作业中的步骤(例如job-id=1)获得未知状态之前,该消息似乎没有问题。在这种情况下
我试图配置我的第一个多线程作业。我们有大约200,000条记录的主目录,我们需要处理。我想将文件分解为10个文件并处理它们。拆分文件tasklet工作正常 主步骤在我的配置中运行,但从步骤不运行。下面是我的配置。 分割者: MultiResourceItemReader: FlatFileItemWriter: 作业配置: 从属步骤配置: 请告知我做错了什么。我没有看到处理器urlFileItem
我正在尝试为分区配置Spring批处理步骤。这里很好的示例显示了一个关于“ID范围”的分区,但我不知道如何从“数据页”范围开始。 在我的顺序步骤中,我有: null
我正在寻找一些关于测试Spring批处理步骤和步骤执行的一般性意见和建议。 我的基本步骤是从api读入数据,处理实体对象,然后写入数据库。我已经测试了快乐之路,这一步成功地完成了。我现在想做的是在处理器阶段数据丢失时测试异常处理。我可以单独测试processor类,但我更愿意测试整个步骤,以确保在步骤/作业级别正确反映流程故障。 我已经阅读了spring批量测试指南,如果我是诚实的,我对它有点迷茫