当前位置: 首页 > 知识库问答 >
问题:

Spring批处理在tasklet中执行动态生成的步骤

曹君墨
2023-03-14

第2步。根据在步骤1中创建的对象列表中有多少项创建步骤列表。

第三步。尝试执行步骤2中创建的步骤列表中的步骤。

下面在executeDynamicStepsTasklet()中执行x个步骤。虽然代码运行时没有任何错误,但它似乎没有做任何事情。我在那个方法中的内容看起来正确吗?

@Configuration
public class ExportMasterListCsvJobConfig {

public static final String JOB_NAME = "exportMasterListCsv";
@Autowired
public JobBuilderFactory jobBuilderFactory;

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Value("${exportMasterListCsv.generateMasterListRows.chunkSize}") 
public int chunkSize;

@Value("${exportMasterListCsv.generateMasterListRows.masterListSql}") 
public String masterListSql;

@Autowired
public DataSource onlineStagingDb;

@Value("${out.dir}") 
public String outDir;

@Value("${exportMasterListCsv.generatePromoStartDateEndDateGroupings.promoStartDateEndDateSql}") 
private String promoStartDateEndDateSql;


private List<DivisionIdPromoCompStartDtEndDtGrouping> divisionIdPromoCompStartDtEndDtGrouping;

private List<Step> dynamicSteps = Collections.synchronizedList(new ArrayList<Step>()) ;


@Bean
public Job exportMasterListCsvJob(
        @Qualifier("createJobDatesStep") Step createJobDatesStep,
        @Qualifier("createDynamicStepsStep") Step createDynamicStepsStep,
        @Qualifier("executeDynamicStepsStep") Step executeDynamicStepsStep) {

    return jobBuilderFactory.get(JOB_NAME)
            .flow(createJobDatesStep)
            .next(createDynamicStepsStep)
            .next(executeDynamicStepsStep)
            .end().build();
}   


@Bean
public Step executeDynamicStepsStep(
        @Qualifier("executeDynamicStepsTasklet")  Tasklet executeDynamicStepsTasklet) {

    return  stepBuilderFactory
                .get("executeDynamicStepsStep")
                .tasklet(executeDynamicStepsTasklet)
                .build();               
}

@Bean
public Tasklet executeDynamicStepsTasklet() {

    return new Tasklet() {

        @Override
        public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
            FlowStep flowStep = new FlowStep(createParallelFlow());
            SimpleJobBuilder jobBuilder = jobBuilderFactory.get("myNewJob").start(flowStep);
            return RepeatStatus.FINISHED;
        }
    };
}

public Flow createParallelFlow() {
    SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
    taskExecutor.setConcurrencyLimit(1); 

    List<Flow> flows = dynamicSteps.stream()
            .map(step -> new FlowBuilder<Flow>("flow_" + step.getName()).start(step).build())
            .collect(Collectors.toList());

    return new FlowBuilder<SimpleFlow>("parallelStepsFlow")
            .split(taskExecutor)
            .add(flows.toArray(new Flow[flows.size()]))
            .build();
}

@Bean

public Step createDynamicStepsStep(
        @Qualifier("createDynamicStepsTasklet")  Tasklet createDynamicStepsTasklet) {

    return  stepBuilderFactory
                .get("createDynamicStepsStep")
                .tasklet(createDynamicStepsTasklet)
                .build();               
}   

@Bean
@JobScope
public Tasklet createDynamicStepsTasklet() {

    return new Tasklet() {

        @Override
        public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
            for (DivisionIdPromoCompStartDtEndDtGrouping grp: divisionIdPromoCompStartDtEndDtGrouping){

                System.err.println("grp: " + grp);

                String stepName = "stp_" + grp;

                String fileName = grp + FlatFileConstants.EXTENSION_CSV;

                Step dynamicStep = 
                        stepBuilderFactory.get(stepName)
                        .<MasterList,MasterList> chunk(10)
                        .reader(queryStagingDbReader(
                                grp.getDivisionId(), 
                                grp.getRpmPromoCompDetailStartDate(), 
                                grp.getRpmPromoCompDetailEndDate()))
                        .writer(masterListFileWriter(fileName))                                
                        .build(); 

                dynamicSteps.add(dynamicStep);

            } 
            System.err.println("createDynamicStepsTasklet dynamicSteps: " + dynamicSteps);
            return RepeatStatus.FINISHED;
        }
    };
}


public FlatFileItemWriter<MasterList> masterListFileWriter(String fileName) {
    FlatFileItemWriter<MasterList> writer = new FlatFileItemWriter<>();
    writer.setResource(new FileSystemResource(new File(outDir, fileName )));
    writer.setHeaderCallback(masterListFlatFileHeaderCallback());
    writer.setLineAggregator(masterListFormatterLineAggregator());
    return writer;
}

共有1个答案

焦正德
2023-03-14

这样不行。您的Tasklet只是创建一个作业,第一步是FlowStep。使用jobBuilderfactory只需创建作业。它不会发射它。方法名“start”可能会产生误导,因为它只定义了第一步。但它不会启动作业。

一旦作业启动,就不能更改其结构(其步骤和子步骤)。因此,不可能根据步骤1中计算的内容来配置步骤2中的flowstep。(当然,您可以在springbatch结构的更深处进行一些黑客操作,并直接修改bean,等等……但是您不想这样做)。

我建议您使用一种带有适当的postConstruct方法的“SetupBean”,该方法被注入到配置作业的类中。这个“SetupBean”负责计算正在处理的对象列表。

@Component
public class SetUpBean {

  private List<Object> myObjects;

  @PostConstruct
  public afterPropertiesSet() {
    myObjects = ...;
  }

  public List<Object> getMyObjects() {
   return myObjects;
  }
}

@Configuration
public class JobConfiguration {

   @Autowired
   private JobBuilderFactory jobBuilderFactory;

   @Autowired
   private StepBuilderFactory stepBuilderFactory;

   @Autowired
   private SetUpBean setup;

   ... 
}
 类似资料:
  • 我目前正在构建一个spring批处理应用程序,其中执行了几个步骤。除了一个,所有的步骤都是简单的tasklet(没有读取器或写入器),它们负责各种任务,如复制文件、发送请求、启动批处理(*.bat)文件等。 大多数步骤应该是串行执行的。在一个特定的步骤中,我希望启动X文件,这些文件最多可以有Y个实例。 null 如果:)我想我必须使用taskExecutor,下面我有一个示例,在这里我开始第一步(

  • 我正在尝试修复Spring Batch中的一个问题,这个问题最近一直困扰着我们的系统。我们有一份工作,在大多数情况下都很好。下载和处理数据是一个多步骤的工作。 问题是有时工作会爆棚。也许我们试图连接到的服务器抛出了错误,或者我们在工作进行到一半时关闭了服务器。此时,下次我们的quartz调度程序尝试运行该作业时,它似乎什么也不做。以下是此作业定义的删节版本: 委婉地说,我是Spring Batch

  • 我使用FlatFileItemReader创建了一个spring批处理作业,它从一个分隔文件中读取数据,然后使用JdbcBatchItemWriter写入DB。我的setp配置如下所示。 上面的配置是为每100行打开单独的事务,因此,如果在完成tasklet(步骤1)之前发生故障,则我无法恢复之前提交的行。有没有办法在一个事务中运行整个tasklet?。 另外:我使用MapJobRepositor

  • 我在没有ItemWriter的情况下定义了我的tasklet,如下所示: 我得到了这个错误: 配置问题:

  • 我有一个Spring批处理tasklet,我不知道如何从中失败。我想检查某些参数,如果它们不存在,则在该步骤中使作业失败。 注释掉的行是我试图让工作退出的行。有人有过这样的经历吗?

  • 我有一个非常简单的spring批处理,它从一个表中更新了一百万条记录。因为它非常简单,所以我尝试只实现一个更新表的Tasklet。 但我想用10个记录的步骤来promise。是可以在tasklet中实现这一点,还是我必须将itemReader/ItemWriter与块一起使用? 提前谢谢。