尝试使用MultiResourceItemReader读取spring批处理中的多个文件,并为每个文件中的记录提供taskExecutor以在多线程中读取。假设一个文件夹中有3个csv文件,MultiResourceItemReader应该逐个执行它,但由于我有taskExecutor,不同的线程占用csv文件,就像两个csv文件被来自同一文件夹的线程占用并开始执行一样。
期望:-MultiResourceItemReader应该读取第一个文件,然后taskExecutor应该生成不同的线程并执行。然后应该拾取另一个文件,并由taskExecutor进行执行。
代码段/batch_configuration:-@bean public Step Step1(){return stepBuilderFactory.get(“Step1”)。
@Bean
public MultiResourceItemReader<POJO> multiResourceItemReader()
{
MultiResourceItemReader<POJO> resourceItemReader = new MultiResourceItemReader<POJO>();
ClassLoader cl = this.getClass().getClassLoader();
ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver(cl);
Resource[] resources;
try {
resources = resolver.getResources("file:/temp/*.csv");
resourceItemReader.setResources(resources);
resourceItemReader.setDelegate(itemReader());
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return resourceItemReader;
}
也许您应该检查Partitioner(https://docs.spring.io/spring-batch/docs/current/reference/html/scalability.html#partitioning)并实现如下所示:
@Bean
public Step mainStep(StepBuilderFactory stepBuilderFactory,
FlatFileItemReader itemReader,
ListDelegateWriter listDelegateWriter,
BatchProperties batchProperties) {
return stepBuilderFactory.get(Steps.MAIN)
.<POJO, POJO>chunk(pageSize)
.reader(unmatchedItemReader)
.writer(listDelegateWriter)
.build();
}
@Bean
public TaskExecutor jobTaskExecutor(@Value("${batch.config.core-pool-size}") Integer corePoolSize,
@Value("${batch.config.max-pool-size}") Integer maxPoolSize) {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(corePoolSize);
taskExecutor.setMaxPoolSize(maxPoolSize);
taskExecutor.afterPropertiesSet();
return taskExecutor;
}
@Bean
@StepScope
public Partitioner partitioner(@Value("#{jobExecutionContext[ResourcesToRead]}") String[] resourcePaths,
@Value("${batch.config.grid-size}") Integer gridSize) {
Resource[] resourceList = Arrays.stream(resourcePaths)
.map(FileSystemResource::new)
.toArray(Resource[]::new);
MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
partitioner.setResources(resourceList);
partitioner.partition(gridSize);
return partitioner;
}
@Bean
public Step masterStep(StepBuilderFactory stepBuilderFactory, Partitioner partitioner, Step mainStep, TaskExecutor jobTaskExecutor) {
return stepBuilderFactory.get(BatchConstants.MASTER)
.partitioner(mainStep)
.partitioner(Steps.MAIN, partitioner)
.taskExecutor(jobTaskExecutor)
.build();
}
我必须使用Spring Batch配置一个作业。是否可以有一个单线程的项目阅读器,但多线程处理器? 在这种情况下,ItemReader将通过从数据库中读取工作项(通过执行预定义的查询)来创建要处理的工作项,每个处理器将并行处理项/块。
我有一个Spring批处理应用程序,它从mysql数据库读取数据,并将其写入csv文件。 问题:当我看到csv中的数据时,我面临的问题是,一些大的数字显示为指数值。例如,对于这个数字“4491611100277480”,它显示为“4.49161E 15” 下面是我正在使用的代码片段
在我的spring-batch-integration应用程序中,文件轮询调用eachfile的batchjob,该应用程序可以在多个服务器(节点)上运行,但它们都应该读取一个公共目录。代码如下 } 现在,当我调用Spring批处理并尝试使用flatfileitemreader读取该文件时,它给了我
我有一个商业案例,使用Spring batch将多个csv文件(每个文件大约1000个,包含1000条记录)合并成单个csv。 请帮助我提供方法和性能方面的指导和解决方案。 到目前为止,我已经尝试了两种方法, 方法1。 Tasklet chunk与multiResourceItemReader一起从目录中读取文件,FlatFileItemWriter作为项目编写器。 这里的问题是,它的处理速度非常
考虑一个阶跃豆: 要求:在Reader中,它从文件中读取(Entity1的)记录。在处理器中,它进行处理,在Writer中,它写入数据库。 在TaskExecutor之前,只创建了一个线程,它将在读取器和处理器中循环1000次,如上面的块设置中所定义的。然后它将移动到writer并写入所有1000条记录。它将再次从记录编号1001开始,然后在读取器和处理器中处理另外1000条记录。这是一个同步执行