当前位置: 首页 > 知识库问答 >
问题:

使用Spring Integration SFTP DSL递归读取文件

琴琪
2023-03-14
@Bean
@InboundChannelAdapter(value = "sftpMgetInputChannel",
    poller = @Poller(fixedDelay = "5000"))
public IntegrationFlow sftpMGetFlow() {
    return IntegrationFlows.from("sftpMgetInputChannel")
            .handleWithAdapter(h -> h.sftpGateway(this.sftpSessionFactory,
             Command.MGET, "'/tmp/remoteDirectory/*'")
            .options(Option.RECURSIVE)
            .regexFileNameFilter("((\\d{8})|*\\.txt)")
            .localDirectoryExpression("sftp-inbound" + "/" + "#remoteDirectory"))
            .channel(remoteFileOutputChannel())
            .get();
}

@Bean
public MessageChannel sftpMgetInboundChannel(){
   return new DirectChannel();
}

@Bean
public PollableChannel remoteFileOutputChannel() {
    return new QueueChannel();
}
@Bean
public MessageHandler messageHandler(){
 return new MessageHandler() { ... }
}

    @InboundChannelAdapter(value = "sftpMgetInputChannel",
        poller = @Poller(fixedDelay = "5000"))
    public String filesForMGET(){
      return "'/tmp/input/remoteDirectory/*'";
    }

    @Bean
    public IntegrationFlow sftpMGetFlow() {
        return IntegrationFlows.from("sftpMgetInputChannel")
                .handleWithAdapter(h -> h.sftpGateway(this.sftpSessionFactory,
                 Command.MGET, "payload")
                .options(Option.RECURSIVE)
                .regexFileNameFilter("((\\d{8})|*\\.txt)")
                .localDirectoryExpression("'sftp-inbound/'" + "#remoteDirectory"))
                .handler(messageHandler())
                .get();
    }

    @Bean
    public MessageChannel sftpMgetInboundChannel(){
       return new DirectChannel();
    }

    @Bean
    public MessageHandler messageHandler(){
     return new MessageHandler() { ... }
    }

使用此更新的代码,我得到以下错误:


    rg.springframework.core.NestedIOException: failed to read file; nested exception is 2: No such file
        at org.springframework.integration.sftp.session.SftpSession.read(SftpSession.java:100)
        at org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession.read(CachingSessionFactory.java:137)
        at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.copyFileToLocalDirectory(AbstractInboundFileSynchronizer.java:176)
        at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:138)
        at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.receive(AbstractInboundFileSynchronizingMessageSource.java:144)
        at org.springframework.integration.endpoint.SourcePollingChannelAdapter.doPoll(SourcePollingChannelAdapter.java:89)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:146)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:144)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:207)
        at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52)
        at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:48)
        at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:202)
        at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:51)
        at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
        at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
        at java.util.concurrent.FutureTask.run(FutureTask.java:138)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:98)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:206)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:680)
    Caused by: 2: No such file
        at com.jcraft.jsch.ChannelSftp.throwStatusError(ChannelSftp.java:2289)
        at com.jcraft.jsch.ChannelSftp._stat(ChannelSftp.java:1741)
        at com.jcraft.jsch.ChannelSftp.get(ChannelSftp.java:1011)
        at com.jcraft.jsch.ChannelSftp.get(ChannelSftp.java:986)
        at org.springframework.integration.sftp.session.SftpSession.read(SftpSession.java:96)
        ... 22 more

共有1个答案

夏高朗
2023-03-14

将表达式设置为payload(与编辑前的问题中的情况一样),发送到网关的消息有效载荷应该是/tmp/remotefolder/*,该消息有效载荷在内部拆分为远程目录和远程文件名(*)。

为什么我需要指定输出通道?

MGET(检索文件的列表)的结果需要放在某个地方。

@InboundChannelAdapter(value = "sftpMgetInputChannel",
    poller = @Poller(fixedDelay = "5000"))
public String filesForMGET() {
    return "/tmp/remoteDirectory/";
}

@Bean
public IntegrationFlow sftpMGetFlow() {
    return IntegrationFlows.from("sftpMgetInputChannel")
            .handleWithAdapter(h -> h.sftpGateway(this.sftpSessionFactory,
             Command.MGET, "payload")
            .options(Option.RECURSIVE)
            .regexFileNameFilter("((\\d{8})|*\\.txt)")
            .localDirectoryExpression("sftp-inbound" + "/" + "#remoteDirectory"))
            .channel(remoteFileOutputChannel())
            .get();
}
 类似资料:
  • 我想配置一个带有JavaDSL的网关,以递归方式从FTP服务器读取所有文件,因为它们位于不同的文件夹中。 我怎么做?请给我一个特别的代码示例

  • 我还发现了这个使用的网页,但我还是不明白如何将其应用于我的情况?

  • 嗨,我在理解为什么我的递归逻辑返回第一个输入时有一些问题,尽管它似乎没有通过验证检查。Java中的一个简单例子: 我的理解是,函数要调用自己,直到满足条件,返回一个0到9之间的数。条件检查似乎是可行的,但是无论第一个数字是什么,总是被返回。一个执行示例产生:

  • 本文向大家介绍java递归读取目录下所有文件的方法,包括了java递归读取目录下所有文件的方法的使用技巧和注意事项,需要的朋友参考一下 java递归读取目录下的所有文件(包含子目录下的所有文件)大概思路如下:通过file.listFiles()方法获取目录下的所有文件(包含子目录下的所有文件),得到files[]数组,然后遍历得到的所有文件,通过isFile(文件)和isDirectory(文件夹

  • 问题内容: 我有一个根目录目录,其中包含多个子目录,所有子目录均包含文件名data.txt。我想做的是编写一个脚本,该脚本进入“根”目录,然后读取所有子目录并读取子目录中的每个“ data.txt”,然后将每个data.txt文件中的内容写入输出文件。 这是我的代码片段: 我的dosomething()部分-如果仅针对一个文件运行该部分,我已经测试并确认它可以正常工作。我还确认,如果我告诉它打印文