当前位置: 首页 > 知识库问答 >
问题:

Spring批处理集成Java DSL和RunIdIncrementer

刁跃
2023-03-14

我希望能够使用在作业配置中定义的runidincrementer以相同的参数(基本上是相同的文件)重新启动作业(可能是因为应用程序已经重新启动,或者由于某些原因我们再次收到了相同的文件)。

不幸的是,run.id=1没有增加,我得到一个JobInstanceAlReadyCompleteException

作业配置

@Autowired
private JobBuilderFactory jobBuilders;

@Bean
public Job importOffersJob() {

    Job job = jobBuilders.get("importOffersJob")
            .start(importOffersStep)
            .listener(traceJobExecutionListener())  
            .incrementer(new RunIdIncrementer())
            .build();

    return job;
}
@Bean
public IntegrationFlow ftpInboundFlow() {
    return IntegrationFlows
            .from(Sftp.inboundAdapter(SftpSessionFactory())
                .regexFilter(".*\\.xml.mini$")
                .deleteRemoteFiles(intCfg.getSftpDeleteRemoteFiles())
                .preserveTimestamp(Boolean.TRUE)
                .autoCreateLocalDirectory(Boolean.TRUE)
                .remoteDirectory(intCfg.getSftpRemoteDirectory())
                .localDirectory(new File(intCfg.getSftpLocalDirectory())
            ), 
                e -> e.id("sftpInboundAdapter")
                .poller(Pollers.fixedRate(intCfg.getSftpPollerInMinutes(), TimeUnit.MINUTES).maxMessagesPerPoll(1)))
            .transform(fileToJobLaunchRequestTransformer())         
            .handle(jobLaunchingGw())
            .handle(logger())
            .get();
}
public class FileToJobLaunchRequestTransformer implements GenericTransformer<Message<File>, JobLaunchRequest> {

    private final static Logger log = LoggerFactory.getLogger(FileToJobLaunchRequestTransformer.class);

    @Autowired
    @Qualifier("importOffersJob")
    private Job job;

    @Override
    public JobLaunchRequest transform(Message<File> source) {
        log.info("FileToJobLaunchRequestTransformer, source.getPayload().getAbsolutePath(): {}", source.getPayload()
                .getAbsolutePath());

        JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
        jobParametersBuilder.addString("pathToFile", "file:" + source.getPayload().getAbsolutePath());
        //jobParametersBuilder.addString("UUID", UUID.randomUUID().toString());

        JobParameters jobParams = job.getJobParametersIncrementer().getNext(jobParametersBuilder.toJobParameters());

        return new JobLaunchRequest(job, jobParams);
        }
     }

多谢

共有1个答案

袁致远
2023-03-14

runidincrementer.getnext():

public JobParameters getNext(JobParameters parameters) {

    JobParameters params = (parameters == null) ? new JobParameters() : parameters;

    long id = params.getLong(key, 0L) + 1;
    return new JobParametersBuilder(params).addLong(key, id).toJobParameters();
}

您每次都要创建一个新的作业参数,所以它总是返回1,因为run.id不存在。

如果将JobParametersBuilder移动到transformer中的一个字段,与Job一起,它将工作,但仅适用于应用程序的实例化。下次启动应用程序时,它将在1点再次启动。

 类似资料:
  • 在spring integration (Java DSL)中,如何定义一个完整流程的事务? 通过Spring集成,我们可以定义一个示例流程: 我需要一个跨度整个流程的交易。目前,当我使用“aMessage转换器”访问数据库时,事务将在处理完此消息转换器后关闭。但是我需要一个在处理“另一个消息转换器”时仍未提交的事务? 我希望只需添加一个“@Transactional”(或@Transaction

  • 我正在将Spring Boot项目与Spring批处理和数据jpa项目集成。所有与作业和数据配置相关的东西都是正确的,除了将我的作业编写器结果保存在数据库中。在我读取文件并对其进行处理后,我无法将其写入mysql数据库。没有错误,但也没有插入。有趣的是我的数据源已配置。因为在插入之前,我可以从数据库中获取示例记录。请协助我解决这个问题。 我的申请。属性: 批次配置: 道类: 作家类: temPer

  • 我是Spring的新人。最近我试着让Spring批处理和Spring集成一起工作。我想有一个JobListener,它会监听消息到达特定的频道并启动Spring批处理作业。 我在github上找到了一个例子(https://github.com/chrisjs/spring-batch-scaling/tree/master/message-job-launch)我试图以某种方式将Spring批处

  • 我正在尝试将BeanIO与spring Batch集成。使用BeanIO,我正在读取一个固定长度的流文件。我已经测试并验证了使用独立类读取平面文件的代码,它可以无缝地工作,但是当我试图将它与Spring Batch集成时,BeanIOFlatFileItemReader的doRead()方法没有被调用,而且我编写的RedemptionEventCustomProcessor是如何直接被调用的。 我

  • 我有一个spring批处理应用程序,它从文件中读取数据,进行一些处理,最后编写一个定制的输出。这一切都是一步到位的。在下一步中,我将使用一个tasklet来归档输入文件(移动到另一个文件夹)。这个应用程序运行良好。但是,现在我需要在远程服务器上对sftp输出文件进行进一步处理。我找到了一种使用spring integration实现sftp的方法,在这里我创建了一个输入通道,该通道将反馈给outb

  • 我正在寻找一些关于测试Spring批处理步骤和步骤执行的一般性意见和建议。 我的基本步骤是从api读入数据,处理实体对象,然后写入数据库。我已经测试了快乐之路,这一步成功地完成了。我现在想做的是在处理器阶段数据丢失时测试异常处理。我可以单独测试processor类,但我更愿意测试整个步骤,以确保在步骤/作业级别正确反映流程故障。 我已经阅读了spring批量测试指南,如果我是诚实的,我对它有点迷茫