当前位置: 首页 > 知识库问答 >
问题:

如何定义spring batch中另一个流中的并行子流?

齐俊达
2023-03-14

我想在spring batch中实现如下的流结构。

           Job
          /   \
       Flow1  Flow2  
         /      \
      Step1    Step2
        /       /  \
       /    Step3  Flow3
      /                \  
     /                 Step4
     \                  /
      \                /
       \              /    
            Step5    

作业配置伪代码如下:

@Configuration
public class JobConfiguration {

......

    @Bean
    public Job Job() {

    Flow flow1 = new FlowBuilder<SimpleFlow>("flow1")
                            .start(step1())
                            .build();

    Flow flow2 = new FlowBuilder<SimpleFlow>("flow2")
           .start(step2())
           .next(step3())
           .split(new SimpleAsyncTaskExecutor()).add(flow3)
           .build();

    Flow flow3 = new FlowBuilder<SimpleFlow>("flow3")
                            .start(step4())
                            .build();

    return jobBuilderFactory.get("job")
            .incrementer(new RunIdIncrementer())
            .start(flow1)
            .split(new SimpleAsyncTaskExecutor()).add(flow2)
            .next(Step5())
            .end()
            .build();
    }
......
}

当我运行批处理时,日志显示执行了步骤1、步骤2、步骤3和步骤5,但没有运行步骤4。

我想知道如何在另一个流中定义子流,上面的代码是实现它的正确方法吗?

提前谢谢!

共有1个答案

姜振濂
2023-03-14

单独运行每个流表明Flow1Flow3是正确的,但Flow2不是正确的。只运行Flow2

    return jobBuilderFactory.get("job")
            .incrementer(new RunIdIncrementer())
            .start(flow2)
            .build()
            .build();

显示执行了step2和step3,但没有执行step4。所以问题是这个流的定义。

您需要在Step3Flow3之间定义一个并行流,就像您对Flow1Flow2所做的那样。这是一个示例:

@Bean
public Job Job() {

    Flow flow1 = new FlowBuilder<SimpleFlow>("flow1")
            .start(step1())
            .build();

    Flow flow3 = new FlowBuilder<SimpleFlow>("flow3")
            .start(step4())
            .build();

    Flow parallelFlow = new FlowBuilder<SimpleFlow>("parallelFlow")
            .start(step3())
            .split(new SimpleAsyncTaskExecutor()).add(flow3)
            .build();

    Flow flow2 = new FlowBuilder<SimpleFlow>("flow2")
            .start(step2())
            .next(parallelFlow)
            .build();

    return jobs.get("job")
            .incrementer(new RunIdIncrementer())
            .start(flow1)
            .split(new SimpleAsyncTaskExecutor()).add(flow2)
            .next(step5())
            .end()
            .build();
}

希望这有帮助。

 类似资料:
  • 我在spring Portlet a中定义了一个spring bean,我的问题是如何使用Portlet bean定位器在另一个Portlet B中定位这个bean,这可能吗,因为下面的行对我不起作用。 ps:我将Portlet中的ContextLoaderListener声明为web.xml,并使用contextConfigLocation来设置加载哪个上下文文件,这个bean是在上下文文件中定

  • 用例:步骤1:ItemReader:从数据库中读取1000个ItemProcessor块中的数据:处理这些数据。ItemWriter:将数据写入地图,以便下一步使用 步骤2:ItemReader:读取地图ItemProcessor:处理地图数据并获取新对象。ItemWriter:将新的进程对象持久化到数据库中。 现在我希望Map在整个作业中保持不变,目前我已经为Map创建了一个不同的POJO类,并

  • 我有一个流应用程序,它从Kafka主题读取数据,从文件读取数据,聚合数据并创建结果。 每5分钟,我想得到多少记录被消耗和记录从文件中读取的计数,并将其发送到另一个流。 我该怎么做?

  • 是否可以为Java8并行流指定自定义线程池?我到处都找不到它。 如果我不能为不同的模块使用不同的线程池,这就意味着我不能在大多数真实世界的情况下安全地使用并行流。 请尝试以下示例。有些CPU密集型任务在单独的线程中执行。这些任务利用并行流。第一个任务被打破,因此每一步需要1秒(通过线程Hibernate模拟)。问题是其他线程会被卡住,等待中断的任务完成。这是一个虚构的示例,但假设一个servlet

  • 我需要基于另一个流过滤一个流,并获取所有匹配条目的计数。 我已经尝试了以下和各种其他组合,但它没有按预期工作。 这个想法是: < li >对于从0到256的每个数字(流1) < li >查看该号码是否出现在另一个列表中(流2 ),如果出现的话 < li >计算出现次数除以流2中的元素总数(18)。 < li >如果没有出现,请收集0。 这基本上是根据流 2 中的出现次数查找流 1 中数字的频率。

  • 问题内容: 问:我怎样才能从读到的一切入的方式是不是一个手工制作的循环用我自己的字节的缓冲区? 问题答案: 编写一个方法来执行此操作,然后从需要该功能的任何地方调用它。番石榴已经在中提供了代码。我敢肯定,几乎所有其他具有“通用” IO功能的库也都有它,但是Guava是我第一个“入门”库。它震撼了:)