当前位置: 首页 > 知识库问答 >
问题:

为每个Ftp文件并发运行Spring集成流

杨昆
2023-03-14

我有一个使用Java DSL配置的集成流,它使用Ftp.inboundchanneladapter从Ftp服务器中提取文件,然后将其转换为jobrequest;然后我有一个.handle()方法,它触发我的批处理作业,所有的工作都是按要求进行的,但是Ftp文件夹中的每个文件都是按顺序运行的

我在Transformerendpoint中添加了CurrentThreadName它为每个文件打印相同的线程名称

这是我到现在为止所尝试的

1.任务执行器bean

 @Bean
    public TaskExecutor taskExecutor(){
        return new SimpleAsyncTaskExecutor("Integration");

    }
  @Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) throws IOException {
    return IntegrationFlows.from(Ftp.inboundAdapter(myFtpSessionFactory)
                    .remoteDirectory("/bar")
                    .localDirectory(localDir.getFile())
            ,c -> c.poller(Pollers.fixedRate(1000).taskExecutor(taskExecutor()).maxMessagesPerPoll(20)))
            .transform(fileMessageToJobRequest(importUserJob(step1())))
            .handle(jobLaunchingGateway)
            .log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload")
            .route(JobExecution.class,j->j.getStatus().isUnsuccessful()?"jobFailedChannel":"jobSuccessfulChannel")
            .get();
}
 @Bean
public MessageChannel inputChannel() {
    return new ExecutorChannel(taskExecutor());
}

感谢任何帮助

谢谢

共有1个答案

徐焱
2023-03-14

您可以简单地将ExecutorChannel注入到流中,它将由框架应用到SourcePollingChannelAdapter。因此,将inputchannel定义为bean,您只需执行以下操作:

.channel(inputChannel())

.transform(fileMessageToJobRequest(importUserJob(step1())))之前。请参阅文件:https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-channels

另一方面,要根据.taskExecutor(taskExecutor())配置并行处理文件,只需将.MaxMessageSperPoll(20)作为1AbstractPollingEndpoint中的逻辑如下所示:

this.taskExecutor.execute(() -> {
                int count = 0;
                while (this.initialized && (this.maxMessagesPerPoll <= 0 || count < this.maxMessagesPerPoll)) {
                    if (pollForMessage() == null) {
                        break;
                    }
                    count++;
                }

因此,我们确实有并行的任务,但只有当它们到达maxMessageSperPoll时,在当前的情况下,它是20。文档中也有一些解释:https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#endpoint-pollingconsumer

maxMessagesPerPoll属性指定在给定轮询操作中接收的最大消息数。这意味着轮询器继续调用receive()而不等待,直到返回null或达到最大值。例如,如果轮询器具有10秒的间隔触发器和maxMessagesPerPoll设置25,并且轮询队列中有100条消息的通道,则可以在40秒内检索所有100条消息。它抓取25,等待10秒,抓取下一个25,以此类推。

 类似资料:
  • 我正在使用Spring集成文件/sftp模块,如何避免下载部分文件?我无法控制将文件推送到ftp/sftp的外部进程。

  • SI的FTP和SFTP出站适配器允许将带有可自定义后缀的临时文件默认发送到。但我正在与一个遗留系统进行接口,该系统只能很好地使用前缀命名的临时文件重命名(例如->)。 也就是说,我可以使用RemoteFileTemplate的API来禁用临时文件、发送和重命名。 对于实现前缀临时文件重命名方案,有什么比RemoteFileTemplate更好的选项?

  • 我在一个目录中有许多文件,每个文件包含跨多行的文本。目前,我使用以下代码将所有这些文件读取到spark数据集( 但是,这会创建一个数据集,其中每一行都是一行,而不是一个文件。我希望数据集中每行都有一个文件(作为字符串)。 如何在不迭代每个文件并将其作为RDD单独读取的情况下实现这一点?

  • 我想配置一个带有JavaDSL的网关,以递归方式从FTP服务器读取所有文件,因为它们位于不同的文件夹中。 我怎么做?请给我一个特别的代码示例

  • 我有一个Spring Boot2.1.6.Release应用程序,其中我有一个用 但它似乎没有工作,因为它仍然在等待一个执行结束,然后再开始下一个。 在@scheduled注释中使用cron进行并行执行的正确方法是什么?

  • 如何使用java dsl Integrationflows从spring集成触发spring批处理作业。 我有下面的代码,它轮询目录中的文件,当新文件添加到目录中时,会生成一条消息,我想在该实例中触发一个Spring批处理作业。请建议。