当前位置: 首页 > 知识库问答 >
问题:

使用spring入站文件适配器进行并发处理

曹鸿风
2023-03-14

我有一个文件系统目录,我想轮询文件,然后并发处理每个文件,每个文件只有一个线程。我的印象是,在封面下,In边界文件适配器将每个文件放入队列中,这样我就可以使用下游的执行器通道来并发处理以后的调用。我在JavaConfig中实现如下:

return IntegrationFlows
            .from(s -> s.file(inboundMessageDirectory.toFile(), Comparator.comparing(File::lastModified)) // serve oldest first
                            .scanner(directoryScanner) // we know the directory structure, so we can take advantage of that with a custom scanner
                            .filter(new AcceptOnceFileListFilter<>(MAX_FILTER_CAPACITY)), // limit number of references in memory
                    e -> e.poller(Pollers
                            .fixedDelay(fileSystemPollDelay)
                            .get()))
            .channel(MessageChannels.executor(executor).get())
            .transform(File::toPath)
            .enrichHeaders(cleanUpConfigurer)
            .get()

执行器通道下游的每个通道本身就是一个直接通道。

然而,我发现下游服务的并发性很差。对于缓存的线程池,我看到相同的线程基本上是串行地执行下游代码,而如果使用固定的池执行器,我看到不同的线程在静态串行执行中进行权衡。

我还尝试在民意调查者和执行者频道之间架起一座桥梁,但没有成功。

共有1个答案

章誉
2023-03-14

这只是因为罩下的SourcePollingChannelAdapterFactoryBean

if (this.pollerMetadata.getMaxMessagesPerPoll() == Integer.MIN_VALUE){
    // the default is 1 since a source might return
    // a non-null and non-interruptible value every time it is invoked
    this.pollerMetadata.setMaxMessagesPerPoll(1);
}

所以,每个都是您的。fixedDelay(fileSystemPollDelay)队列中只轮询一个文件进行处理。

所以,只需增加。maxMessagesPerPoll()到适合您的系统值,并享受并发的乐趣!

顺便说一句,没有理由在轮询适配器之后引入ExecutorChannel。您只需为. poller()使用. taskExecitor()完全出于相同的并发原因。

 类似资料:
  • 我在实现某些功能时遇到了一些问题,当我删除文件时,我注意到了一些不一致。 1)当我删除多个文件时,有时并不是所有文件都被转移到正确的目录。

  • 我正在使用spring integration sftp入站流通道适配器,它每隔几秒钟就轮询一次。入站适配器正在多次挑选相同的文件进行处理。下面是配置。 上面代码中的sample.customFilter是SftpRegexPatternFileListFilter的一个子类,其中我将accept方法修改如下,以便根据Spring SFTP vanging filename-regex中提供的解决

  • 问题内容: 我已经配置了spring SFTP以将文件从远程路径集中到本地,以处理一些作业,然后同时删除本地和远程文件。 下面的配置工作正常,除了删除本地文件外,我没有找到任何配置来删除本地文件,例如 这里,当文件从远程传输到本地时,服务激活器被调用。 服务激活器完成作业后,如何配置删除本地文件? 编辑:通过以下更改解决了从远程和本地删除有效负载的问题: 问题答案: 消息发送到流之后,入站适配器永

  • 基础知识: 使用带集成的Spring 4.1.1,引导和1.0.0的DSL。 多个入站SFTP适配器在不同的时间表上从不同的供应商获取文件。 每个集成流在文件下载后将标头附加到消息中,以标识供应商源。 使用MessagePublishingErrorHandler处理异常。 标准消息流在消息处理成功或消息未能完成时通知外部监控解决方案。使用消息头来识别哪个流失败。 在我们收到消息后,成功和错误流都

  • 问题内容: Spring Integration FTP中的入站通道适配器和出站通道适配器之间有什么区别?我应该使用哪一个?何时使用? 我从文档中了解到,出站可以发送任何类型的文件(例如byte [],String,java.io.File),但入站仅限于文件类型。那仅仅是区别还是其他? 问题答案: 我建议您首先阅读理论 。 任何Inbound适配器都旨在从外部系统获取数据。Outbound-放置