如何使用java dsl Integrationflows从spring集成触发spring批处理作业。
我有下面的代码,它轮询目录中的文件,当新文件添加到目录中时,会生成一条消息,我想在该实例中触发一个Spring批处理作业。请建议。
@Bean
public IntegrationFlow inboundFileIntegration(@Value("${inbound.file.poller.fixed.delay}") long period,
@Value("${inbound.file.poller.max.messages.per.poll}") int maxMessagesPerPoll,
TaskExecutor taskExecutor,
MessageSource<File> fileReadingMessageSource) {
return IntegrationFlows.from(fileReadingMessageSource,
c -> c.poller(Pollers.fixedDelay(period)
.taskExecutor(taskExecutor)
.maxMessagesPerPoll(maxMessagesPerPoll)))
.transform(Transformers.fileToString())
.channel(ApplicationConfiguration.INBOUND_CHANNEL)
.get();
}
以下是我的代码:-
@Bean
public IntegrationFlow inboundFileIntegration(@Value("${inbound.file.poller.fixed.delay}") long period,
@Value("${inbound.file.poller.max.messages.per.poll}") int maxMessagesPerPoll,
TaskExecutor taskExecutor,
MessageSource<File> fileReadingMessageSource,
JobLaunchingGateway jobLaunchingGateway) {
return IntegrationFlows.from(fileReadingMessageSource,
c -> c.poller(Pollers.fixedDelay(period)
.taskExecutor(taskExecutor)
.maxMessagesPerPoll(maxMessagesPerPoll)))
.handle(fileMessageToJobRequest(),"toRequest")
.handle(jobLaunchingGateway)
.log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload")
.get();
}
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
fileMessageToJobRequest.setFileParameterName("input.file.name");
// fileMessageToJobRequest.setJob(personJob());
return fileMessageToJobRequest;
}
@Bean
public JobLaunchingGateway jobLaunchingGateway() {
SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
// simpleJobLauncher.setJobRepository(jobRepository);
simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);
return jobLaunchingGateway;
}
Spring Batch参考手册中有关于此事的干净样品:
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
fileMessageToJobRequest.setFileParameterName("input.file.name");
fileMessageToJobRequest.setJob(personJob());
return fileMessageToJobRequest;
}
@Bean
public JobLaunchingGateway jobLaunchingGateway() {
SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
simpleJobLauncher.setJobRepository(jobRepository);
simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);
return jobLaunchingGateway;
}
@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/myfiles")).
filter(new SimplePatternFileListFilter("*.csv")),
c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1))).
handle(fileMessageToJobRequest()).
handle(jobLaunchingGateway).
log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
get();
}
我们有一个作业,它使用cron表达式在下午1点到5点之间每隔M-F轮询一次文件和数据库。在此期间,如果文件到达,它将下载文件并调用作业。这很好,我们使用了spring集成和批处理。 现在,我们需要一些定制,其中我们有多个作业,其中job1应该像上面一样轮询。一旦文件处理成功,它应该停止轮询。 第二个要求是,若在轮询期间并没有收到文件,我们希望向ops团队发送一些通知,以便他们可以采取一些行动。
在spring integration (Java DSL)中,如何定义一个完整流程的事务? 通过Spring集成,我们可以定义一个示例流程: 我需要一个跨度整个流程的交易。目前,当我使用“aMessage转换器”访问数据库时,事务将在处理完此消息转换器后关闭。但是我需要一个在处理“另一个消息转换器”时仍未提交的事务? 我希望只需添加一个“@Transactional”(或@Transaction
我正在使用下面的测试用例运行这个程序: 我得到这个错误: 当我输入这个问题时,我尝试了谷歌关于这个问题的大多数顶级答案和StachOverflow的建议,所有这些都导致了不同的其他错误,我似乎从主要问题上转移了注意力。 最常见的建议是添加一个全局轮询器,但这会导致以下错误: 原因:java.lang.IllegalArgumentException:不应为终结点“org.SpringFramewo
如何使用Spring集成Api轮询特定文件目录中的多个文件,而无需xml配置,最好使用基于Java注释的方法?我想获得轮询文件列表,并对其进行迭代并进一步处理。这是要求。可用于满足此要求的任何示例代码。提前谢谢。下面是我使用的代码片段。 但是不管输入目录中没有文件,消息源实例总是只获取一个文件。不确定如何使其适用于要获取的多个文件。
我阅读了Spring集成指南和这里的示例,获得了Spring集成SFTP程序的工作示例。我已经有了一个可以工作的Spring批处理程序,它可以读取大量文件并转储到数据库中。 我现在正试图通过查看Spring文档来集成Spring批处理和Spring集成程序,我创建了以下配置。 我使用下面的测试用例运行这个程序: 我得到了这个错误: 在我输入这个问题时,我尝试了谷歌关于这个问题的大多数顶级答案和St
我已经建立了一个简单的Spring集成流程,该流程由以下步骤组成: 然后定期轮询一个rest api 对有效载荷做一些处理 并将其置于Kafka主题上。 请遵守以下代码: 这非常有效,然而,我正在努力想出一些好的测试。 我应该如何模拟外部RESTAPI