当前位置: 首页 > 知识库问答 >
问题:

如何在spring批处理中使用MultiResourceItemReader在读取单个csv后指定taskexecutor生成线程

易宏阔
2023-03-14

尝试使用MultiResourceItemReader读取spring批处理中的多个文件,并为每个文件中的记录提供taskExecutor以在多线程中读取。假设一个文件夹中有3个csv文件,MultiResourceItemReader应该逐个执行它,但由于我有taskExecutor,不同的线程占用csv文件,就像两个csv文件被来自同一文件夹的线程占用并开始执行一样。

期望:-MultiResourceItemReader应该读取第一个文件,然后taskExecutor应该生成不同的线程并执行。然后应该拾取另一个文件,并由taskExecutor进行执行。

代码段/batch_configuration:-@bean public Step Step1(){return stepBuilderFactory.get(“Step1”)。 chunk(5)。reader(multiResourceItemReader())。writer(writer())。taskExecutor(taskExecutor())。throttleLimit(throttleLimit)。build();}

@Bean
public MultiResourceItemReader<POJO> multiResourceItemReader() 
{
    MultiResourceItemReader<POJO> resourceItemReader = new MultiResourceItemReader<POJO>();
    ClassLoader cl = this.getClass().getClassLoader();
    ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver(cl);

    Resource[] resources;
    try {
        resources = resolver.getResources("file:/temp/*.csv");
         resourceItemReader.setResources(resources);
            resourceItemReader.setDelegate(itemReader());
    } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    return resourceItemReader;
}

共有1个答案

毕衡
2023-03-14

也许您应该检查Partitioner(https://docs.spring.io/spring-batch/docs/current/reference/html/scalability.html#partitioning)并实现如下所示:

@Bean
public Step mainStep(StepBuilderFactory stepBuilderFactory,
                     FlatFileItemReader itemReader,
                     ListDelegateWriter listDelegateWriter,
                     BatchProperties batchProperties) {
    return stepBuilderFactory.get(Steps.MAIN)
            .<POJO, POJO>chunk(pageSize)
            .reader(unmatchedItemReader)
            .writer(listDelegateWriter)
            .build();
}

@Bean
public TaskExecutor jobTaskExecutor(@Value("${batch.config.core-pool-size}") Integer corePoolSize,
                                    @Value("${batch.config.max-pool-size}") Integer maxPoolSize) {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(corePoolSize);
    taskExecutor.setMaxPoolSize(maxPoolSize);
    taskExecutor.afterPropertiesSet();
    return taskExecutor;
}

@Bean
@StepScope
public Partitioner partitioner(@Value("#{jobExecutionContext[ResourcesToRead]}") String[] resourcePaths,
                               @Value("${batch.config.grid-size}") Integer gridSize) {
    Resource[] resourceList = Arrays.stream(resourcePaths)
            .map(FileSystemResource::new)
            .toArray(Resource[]::new);
    MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
    partitioner.setResources(resourceList);
    partitioner.partition(gridSize);
    return partitioner;
}

@Bean
public Step masterStep(StepBuilderFactory stepBuilderFactory, Partitioner partitioner, Step mainStep, TaskExecutor jobTaskExecutor) {
    return stepBuilderFactory.get(BatchConstants.MASTER)
            .partitioner(mainStep)
            .partitioner(Steps.MAIN, partitioner)
            .taskExecutor(jobTaskExecutor)
            .build();
}
 类似资料:
  • 我必须使用Spring Batch配置一个作业。是否可以有一个单线程的项目阅读器,但多线程处理器? 在这种情况下,ItemReader将通过从数据库中读取工作项(通过执行预定义的查询)来创建要处理的工作项,每个处理器将并行处理项/块。

  • 我有一个Spring批处理应用程序,它从mysql数据库读取数据,并将其写入csv文件。 问题:当我看到csv中的数据时,我面临的问题是,一些大的数字显示为指数值。例如,对于这个数字“4491611100277480”,它显示为“4.49161E 15” 下面是我正在使用的代码片段

  • 在我的spring-batch-integration应用程序中,文件轮询调用eachfile的batchjob,该应用程序可以在多个服务器(节点)上运行,但它们都应该读取一个公共目录。代码如下 } 现在,当我调用Spring批处理并尝试使用flatfileitemreader读取该文件时,它给了我

  • 我有一个商业案例,使用Spring batch将多个csv文件(每个文件大约1000个,包含1000条记录)合并成单个csv。 请帮助我提供方法和性能方面的指导和解决方案。 到目前为止,我已经尝试了两种方法, 方法1。 Tasklet chunk与multiResourceItemReader一起从目录中读取文件,FlatFileItemWriter作为项目编写器。 这里的问题是,它的处理速度非常

  • 考虑一个阶跃豆: 要求:在Reader中,它从文件中读取(Entity1的)记录。在处理器中,它进行处理,在Writer中,它写入数据库。 在TaskExecutor之前,只创建了一个线程,它将在读取器和处理器中循环1000次,如上面的块设置中所定义的。然后它将移动到writer并写入所有1000条记录。它将再次从记录编号1001开始,然后在读取器和处理器中处理另外1000条记录。这是一个同步执行