当前位置: 首页 > 知识库问答 >
问题:

如何从Spring集成java dsl集成文件轮询集成流触发Spring批处理

席宜修
2023-03-14

如何使用java dsl Integrationflows从spring集成触发spring批处理作业。

我有下面的代码,它轮询目录中的文件,当新文件添加到目录中时,会生成一条消息,我想在该实例中触发一个Spring批处理作业。请建议。

@Bean
public IntegrationFlow inboundFileIntegration(@Value("${inbound.file.poller.fixed.delay}") long period,
                                              @Value("${inbound.file.poller.max.messages.per.poll}") int maxMessagesPerPoll,
                                              TaskExecutor taskExecutor,
                                              MessageSource<File> fileReadingMessageSource) {

    return IntegrationFlows.from(fileReadingMessageSource,
            c -> c.poller(Pollers.fixedDelay(period)
                    .taskExecutor(taskExecutor)
                    .maxMessagesPerPoll(maxMessagesPerPoll)))
              .transform(Transformers.fileToString())
                     .channel(ApplicationConfiguration.INBOUND_CHANNEL)                 

             .get();
}

共有2个答案

严知
2023-03-14

以下是我的代码:-

@Bean
public IntegrationFlow inboundFileIntegration(@Value("${inbound.file.poller.fixed.delay}") long period,
                                              @Value("${inbound.file.poller.max.messages.per.poll}") int maxMessagesPerPoll,
                                              TaskExecutor taskExecutor,
                                              MessageSource<File> fileReadingMessageSource,
                                              JobLaunchingGateway jobLaunchingGateway) {

    return IntegrationFlows.from(fileReadingMessageSource,
            c -> c.poller(Pollers.fixedDelay(period)
                    .taskExecutor(taskExecutor)
                    .maxMessagesPerPoll(maxMessagesPerPoll)))
              .handle(fileMessageToJobRequest(),"toRequest")
                       .handle(jobLaunchingGateway)
                        .log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload")
                             .get();
}
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
    FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
    fileMessageToJobRequest.setFileParameterName("input.file.name");
  //  fileMessageToJobRequest.setJob(personJob());
    return fileMessageToJobRequest;
}

@Bean
public JobLaunchingGateway jobLaunchingGateway() {
    SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
  //  simpleJobLauncher.setJobRepository(jobRepository);
    simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
    JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);

    return jobLaunchingGateway;
}
郭星文
2023-03-14

Spring Batch参考手册中有关于此事的干净样品:

@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
    FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
    fileMessageToJobRequest.setFileParameterName("input.file.name");
    fileMessageToJobRequest.setJob(personJob());
    return fileMessageToJobRequest;
}

@Bean
public JobLaunchingGateway jobLaunchingGateway() {
    SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
    simpleJobLauncher.setJobRepository(jobRepository);
    simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
    JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);

    return jobLaunchingGateway;
}

@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
    return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/myfiles")).
                    filter(new SimplePatternFileListFilter("*.csv")),
            c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1))).
            handle(fileMessageToJobRequest()).
            handle(jobLaunchingGateway).
            log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
            get();
}
 类似资料:
  • 我们有一个作业,它使用cron表达式在下午1点到5点之间每隔M-F轮询一次文件和数据库。在此期间,如果文件到达,它将下载文件并调用作业。这很好,我们使用了spring集成和批处理。 现在,我们需要一些定制,其中我们有多个作业,其中job1应该像上面一样轮询。一旦文件处理成功,它应该停止轮询。 第二个要求是,若在轮询期间并没有收到文件,我们希望向ops团队发送一些通知,以便他们可以采取一些行动。

  • 在spring integration (Java DSL)中,如何定义一个完整流程的事务? 通过Spring集成,我们可以定义一个示例流程: 我需要一个跨度整个流程的交易。目前,当我使用“aMessage转换器”访问数据库时,事务将在处理完此消息转换器后关闭。但是我需要一个在处理“另一个消息转换器”时仍未提交的事务? 我希望只需添加一个“@Transactional”(或@Transaction

  • 我正在使用下面的测试用例运行这个程序: 我得到这个错误: 当我输入这个问题时,我尝试了谷歌关于这个问题的大多数顶级答案和StachOverflow的建议,所有这些都导致了不同的其他错误,我似乎从主要问题上转移了注意力。 最常见的建议是添加一个全局轮询器,但这会导致以下错误: 原因:java.lang.IllegalArgumentException:不应为终结点“org.SpringFramewo

  • 如何使用Spring集成Api轮询特定文件目录中的多个文件,而无需xml配置,最好使用基于Java注释的方法?我想获得轮询文件列表,并对其进行迭代并进一步处理。这是要求。可用于满足此要求的任何示例代码。提前谢谢。下面是我使用的代码片段。 但是不管输入目录中没有文件,消息源实例总是只获取一个文件。不确定如何使其适用于要获取的多个文件。

  • 我阅读了Spring集成指南和这里的示例,获得了Spring集成SFTP程序的工作示例。我已经有了一个可以工作的Spring批处理程序,它可以读取大量文件并转储到数据库中。 我现在正试图通过查看Spring文档来集成Spring批处理和Spring集成程序,我创建了以下配置。 我使用下面的测试用例运行这个程序: 我得到了这个错误: 在我输入这个问题时,我尝试了谷歌关于这个问题的大多数顶级答案和St

  • 我已经建立了一个简单的Spring集成流程,该流程由以下步骤组成: 然后定期轮询一个rest api 对有效载荷做一些处理 并将其置于Kafka主题上。 请遵守以下代码: 这非常有效,然而,我正在努力想出一些好的测试。 我应该如何模拟外部RESTAPI