当前位置: 首页 > 知识库问答 >
问题:

Spring Batch-如何根据上一步创建的参数生成并行步骤

狄玉书
2023-03-14

我正在尝试使用在任务小程序中创建的工作参数来创建任务小程序执行后的步骤。

一个小任务尝试查找一些文件(findFiles()),如果它找到一些文件,它会将文件名保存到字符串列表中。

在tasklet中,我将数据传递如下:chunkContext.getStepContext(). getStepExecution(). getExecutionContext(). put("file", fileNames);

下一步是一个并行流程,其中每个文件都将执行一个简单的读处理器-写器步骤(如果您对我是如何到达那里感兴趣,请参阅我之前的问题:Spring批处理-循环读/处理器/写器步骤)

在构建作业readFilesJob()时,最初会使用“假”文件列表创建一个流,因为只有在执行了tasklet之后,才知道真正的文件列表。

如何配置我的作业,以便首先执行任务,然后使用从任务生成的文件列表执行并行流?

我认为这归根结底是在运行时的正确时刻让文件名列表加载正确的数据。。。但是怎么做呢?

以下是我的简化配置:

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

    private static final String FLOW_NAME = "flow1";
    private static final String PLACE_HOLDER = "empty";

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    public List<String> files = Arrays.asList(PLACE_HOLDER);

    @Bean
    public Job readFilesJob() throws Exception {   
        List<Step> steps = files.stream().map(file -> createStep(file)).collect(Collectors.toList());

        FlowBuilder<Flow> flowBuilder = new FlowBuilder<>(FLOW_NAME);

        Flow flow = flowBuilder
                .start(findFiles())             
                .next(createParallelFlow(steps))
                .build();       

        return jobBuilderFactory.get("readFilesJob")                
                .start(flow)                
                .end()
                .build();
    }

    private static Flow createParallelFlow(List<Step> steps){
        SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
        taskExecutor.setConcurrencyLimit(steps.size());

        List<Flow> flows = steps.stream()
                .map(step ->
                        new FlowBuilder<Flow>("flow_" + step.getName()) 
                        .start(step) 
                        .build()) 
                .collect(Collectors.toList());

        return new FlowBuilder<SimpleFlow>("parallelStepsFlow").split(taskExecutor) 
             .add(flows.toArray(new Flow[flows.size()]))
             .build();      
    }

    private Step createStep(String fileName){
        return stepBuilderFactory.get("readFile" + fileName)
                .chunk(100)
                .reader(reader(fileName))               
                .writer(writer(filename))                               
                .build();
    }

    private FileFinder findFiles(){
        return new FileFinder();
    }
}

运行并行作业时如何安全地将参数从Tasklet传递到步骤中的问题和答案建议在阅读器/写入器中使用这样的构造:

@Value(“#{jobExecutionContext[filePath]}”)字符串文件路径

但是,由于createParallelFlow()方法中创建步骤的方式,我真的希望可以将fileName作为字符串传递给阅读器/编写器。因此,即使这个问题的答案可能是我这里问题的解决方案,但它不是所需的解决方案。但是如果我错了,请不要副歌纠正我。

我使用文件名示例来更好地澄清这个问题。我的问题实际上不是从一个目录中读取多个文件。我的问题实际上归结为在运行时生成数据并将其传递给下一个动态生成的步骤。

添加了fileFinder的简化tasklet。

@Component
public class FileFinder implements Tasklet, InitializingBean {

    List<String> fileNames;

    public List<String> getFileNames() {
        return fileNames;
    }

    @PostConstruct
    public void afterPropertiesSet() {
        // read the filenames and store dem in the list
        fileNames.add("sample-data1.csv");
        fileNames.add("sample-data2.csv");
    }

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
        // Execution of methods that will find the file names and put them in the list...
        chunkContext.getStepContext().getStepExecution().getExecutionContext().put("files", fileNames);                     
        return RepeatStatus.FINISHED;
    }    
}

共有1个答案

彭嘉赐
2023-03-14

我不确定,我是否正确理解了您的问题,但据我所知,在动态构建作业之前,您需要有包含文件名的列表。

你可以这样做:

@Component
public class MyJobSetup {
    List<String> fileNames;

    public List<String> getFileNames() {
        return fileNames;
    }

    @PostConstruct
    public void afterPropertiesSet() {
        // read the filenames and store dem in the list
        fileNames = ....;
    }
}

之后,您可以将此Bean注入您的JobConfiguration Bean中

@Configuration
@EnableBatchProcessing
@Import(MyJobSetup.class)
public class BatchConfiguration {

    private static final String FLOW_NAME = "flow1";
    private static final String PLACE_HOLDER = "empty";

    @Autowired
    private  MyJobSetup jobSetup; // <--- Inject
          // PostConstruct of MyJobSetup was executed, when it is injected

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    public List<String> files = Arrays.asList(PLACE_HOLDER);

    @Bean
    public Job readFilesJob() throws Exception {   
        List<Step> steps = jobSetUp.getFileNames() // get the list of files
             .stream() // as stream
             .map(file -> createStep(file)) // map...
             .collect(Collectors.toList()); // and create the list of steps
 类似资料:
  • 已知数据表,请问如何根据表的字段创建ES索引,并且能自动实时更新,这个要如何做到呢,大致的步骤是怎么样的?

  • 正如之前介绍的,Flaskr 是一个数据库驱动的应用,更准确的说法 是,一个由关系数据库系统驱动的应用。关系数据库系统需要一个模 式来决定存储信息的方式。所以在第一次开启服务器之前,要点是创 建模式。 可以通过管道把 schema.sql 作为 sqlite3 命令的输入来创建这 个模式,命令为如下: sqlite3 /tmp/flaskr.db < schema.sql 这种方法的缺点是需要安装

  • 本文向大家介绍Python3随机漫步生成数据并绘制,包括了Python3随机漫步生成数据并绘制的使用技巧和注意事项,需要的朋友参考一下 本文为大家分享了Python3随机漫步生成数据并绘制的具体代码,供大家参考,具体内容如下 random_walk.py rw_visual.py 结果图: 以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持呐喊教程。

  • 本文向大家介绍oracle创建一个数据库三步走,包括了oracle创建一个数据库三步走的使用技巧和注意事项,需要的朋友参考一下 以前开发的时候用得比较多的是mysql和sql server,oracle用的比较少,用起来比较生疏,mysql和sql server用起来比较类似,就oracle的使用方式和他们不同,oracle在创建数据库的时候要对应一个用户,数据库和用户一般一一对应,mysql和s

  • 先创建规范 在 NEI 平台 上点击“工程规范 -> 新建规范”,输入规范名称,比如 TodoSpec,点击“保存” 点击“工程结构”,新建一个文件,文件名为 ``,在右边选择“数据模型列表填充” 将 JavaBean 文件的示例模板中的内容复制到新创建的文件内容中 然后创建工程 点击“项目管理”,在某个项目组下创建一个项目,比如 TodoWeb 在刚才创建的 TodoWeb 项目下,点击“资源管

  • 问题内容: 我正在寻找一种基于接收到的参数来同步方法的方法,如下所示: 我希望基于这样的参数来同步方法: 线程1:doSomething(“ a”); 线程2:doSomething(“ b”); 线程3:doSomething(“ c”); 线程4:doSomething(“ a”); 线程1,线程2和线程3将在不同步的情况下执行代码,但是线程4将等待直到线程1完成代码,因为它具有相同的“ a”