我有一个使用Java DSL配置的集成流,它使用Ftp.inboundchanneladapter
从Ftp服务器中提取文件,然后将其转换为jobrequest
;然后我有一个.handle()
方法,它触发我的批处理作业,所有的工作都是按要求进行的,但是Ftp文件夹中的每个文件都是按顺序运行的
我在Transformerendpoint中添加了CurrentThreadName
它为每个文件打印相同的线程名称
这是我到现在为止所尝试的
1.任务执行器bean
@Bean
public TaskExecutor taskExecutor(){
return new SimpleAsyncTaskExecutor("Integration");
}
@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) throws IOException {
return IntegrationFlows.from(Ftp.inboundAdapter(myFtpSessionFactory)
.remoteDirectory("/bar")
.localDirectory(localDir.getFile())
,c -> c.poller(Pollers.fixedRate(1000).taskExecutor(taskExecutor()).maxMessagesPerPoll(20)))
.transform(fileMessageToJobRequest(importUserJob(step1())))
.handle(jobLaunchingGateway)
.log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload")
.route(JobExecution.class,j->j.getStatus().isUnsuccessful()?"jobFailedChannel":"jobSuccessfulChannel")
.get();
}
@Bean
public MessageChannel inputChannel() {
return new ExecutorChannel(taskExecutor());
}
感谢任何帮助
谢谢
您可以简单地将ExecutorChannel
注入到流中,它将由框架应用到SourcePollingChannelAdapter
。因此,将inputchannel
定义为bean,您只需执行以下操作:
.channel(inputChannel())
在.transform(fileMessageToJobRequest(importUserJob(step1())))
之前。请参阅文件:https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-channels
另一方面,要根据.taskExecutor(taskExecutor())
配置并行处理文件,只需将.MaxMessageSperPoll(20)
作为1
。AbstractPollingEndpoint
中的逻辑如下所示:
this.taskExecutor.execute(() -> {
int count = 0;
while (this.initialized && (this.maxMessagesPerPoll <= 0 || count < this.maxMessagesPerPoll)) {
if (pollForMessage() == null) {
break;
}
count++;
}
因此,我们确实有并行的任务,但只有当它们到达maxMessageSperPoll
时,在当前的情况下,它是20
。文档中也有一些解释:https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#endpoint-pollingconsumer
maxMessagesPerPoll属性指定在给定轮询操作中接收的最大消息数。这意味着轮询器继续调用receive()而不等待,直到返回null或达到最大值。例如,如果轮询器具有10秒的间隔触发器和maxMessagesPerPoll设置25,并且轮询队列中有100条消息的通道,则可以在40秒内检索所有100条消息。它抓取25,等待10秒,抓取下一个25,以此类推。
我正在使用Spring集成文件/sftp模块,如何避免下载部分文件?我无法控制将文件推送到ftp/sftp的外部进程。
SI的FTP和SFTP出站适配器允许将带有可自定义后缀的临时文件默认发送到。但我正在与一个遗留系统进行接口,该系统只能很好地使用前缀命名的临时文件重命名(例如->)。 也就是说,我可以使用RemoteFileTemplate的API来禁用临时文件、发送和重命名。 对于实现前缀临时文件重命名方案,有什么比RemoteFileTemplate更好的选项?
我在一个目录中有许多文件,每个文件包含跨多行的文本。目前,我使用以下代码将所有这些文件读取到spark数据集( 但是,这会创建一个数据集,其中每一行都是一行,而不是一个文件。我希望数据集中每行都有一个文件(作为字符串)。 如何在不迭代每个文件并将其作为RDD单独读取的情况下实现这一点?
我想配置一个带有JavaDSL的网关,以递归方式从FTP服务器读取所有文件,因为它们位于不同的文件夹中。 我怎么做?请给我一个特别的代码示例
我有一个Spring Boot2.1.6.Release应用程序,其中我有一个用 但它似乎没有工作,因为它仍然在等待一个执行结束,然后再开始下一个。 在@scheduled注释中使用cron进行并行执行的正确方法是什么?
如何使用java dsl Integrationflows从spring集成触发spring批处理作业。 我有下面的代码,它轮询目录中的文件,当新文件添加到目录中时,会生成一条消息,我想在该实例中触发一个Spring批处理作业。请建议。