当前位置: 首页 > 知识库问答 >
问题:

使用Spring集成从远程SFTP目录和子目录流式传输

公羊安怡
2023-03-14

我正在使用Spring集成流入站通道适配器,从远程SFTP获取流并解析内容进程的每一行。

我使用:

IntegrationFlows.from(Sftp.inboundStreamingAdapter(template)
                          .filter(remoteFileFilter)
                          .remoteDirectory("test_dir"),
                        e -> e.id("sftpInboundAdapter")
                              .autoStartup(true)
                              .poller(Pollers.fixedDelay(fetchInt)))
                .handle(Files.splitter(true, true))
....

它现在可以工作了。但是我只能从test_dir目录中获取文件,但是我需要递归地从这个目录和子目录中获取文件,并解析每一行。

我注意到入站通道适配器是Sftp。内置适配器(sftpSessionFactory)。扫描仪(…) 。它可以扫描子目录。但我没有看到任何关于流式入站通道适配器的内容。

那么,如何在流入站通道适配器中实现递归地从目录获取文件?

谢谢。

共有3个答案

杜楚
2023-03-14

就像dsillman2000评论的那样,我也发现这个答案可能需要更多的解释。

在根据这里的示例弄清楚这一点后,这是我的扩展示例,它对我有用,提取的变量和方法(希望)清楚地说明各个部分是什么或做什么。

依赖关系:主要是组织。springframework。集成:spring集成sftp:2.6.6

package com.example.sftp.incoming;

import com.jcraft.jsch.ChannelSftp;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.dsl.HeaderEnricherSpec;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.file.filters.AbstractFileListFilter;
import org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway;
import org.springframework.integration.file.remote.session.CachingSessionFactory;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.integration.file.splitter.FileSplitter;
import org.springframework.integration.metadata.SimpleMetadataStore;
import org.springframework.integration.sftp.dsl.Sftp;
import org.springframework.integration.sftp.dsl.SftpOutboundGatewaySpec;
import org.springframework.integration.sftp.filters.SftpPersistentAcceptOnceFileListFilter;
import org.springframework.integration.sftp.session.DefaultSftpSessionFactory;
import org.springframework.messaging.Message;

import java.util.function.Consumer;
import java.util.function.Function;

@Configuration
public class SftpIncomingRecursiveConfiguration {

    private static final String BASE_REMOTE_FOLDER_TO_GET_FILES_FROM = "/tmp/sftptest/";
    private static final String REMOTE_FOLDER_PATH_AS_EXPRESSION = "'" + BASE_REMOTE_FOLDER_TO_GET_FILES_FROM + "'";

    private static final String INBOUND_CHANNEL_NAME = "sftpGetInputChannel";


    @Value("${demo.sftp.host}")
    private String sftpHost;

    @Value("${demo.sftp.user}")
    private String sftpUser;

    @Value("${demo.sftp.password}")
    private String sftpPassword;

    @Value("${demo.sftp.port}")
    private String sftpPort;

    @Bean
    public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost(sftpHost);
        factory.setPort(Integer.parseInt(sftpPort));
        factory.setUser(sftpUser);
        factory.setPassword(sftpPassword);
        factory.setAllowUnknownKeys(true);
        return new CachingSessionFactory<>(factory);
    }


    // poll for new files every 500ms
    @InboundChannelAdapter(value = INBOUND_CHANNEL_NAME, poller = @Poller(fixedDelay = "500"))
    public String filesForSftpGet() {
        return BASE_REMOTE_FOLDER_TO_GET_FILES_FROM;
    }

    @Bean
    public IntegrationFlow sftpGetFlow(SessionFactory<ChannelSftp.LsEntry> sessionFactory) {
        return IntegrationFlows
                .from(INBOUND_CHANNEL_NAME)
                .handle(listRemoteFiles(sessionFactory))
                .split()
                .log(logTheFilePath())
                .enrichHeaders(addAMessageHeader())
                .log(logTheMessageHeaders())
                .handle(getTheFile(sessionFactory))
                .split(splitContentIntoLines())
                .log(logTheFileContent())
                .get();
    }

    private SftpOutboundGatewaySpec listRemoteFiles(SessionFactory<ChannelSftp.LsEntry> sessionFactory) {
        return Sftp.outboundGateway(sessionFactory,
                        AbstractRemoteFileOutboundGateway.Command.LS, REMOTE_FOLDER_PATH_AS_EXPRESSION)
                .options(AbstractRemoteFileOutboundGateway.Option.RECURSIVE, AbstractRemoteFileOutboundGateway.Option.NAME_ONLY)
                .filter(onlyFilesWeHaveNotSeenYet())
                .filter(onlyTxtFiles());
    }


    /* Persistent file list filter using the server's file timestamp to detect if we've already 'seen' this file.
    Without it, the program would report the same file over and over again. */
    private SftpPersistentAcceptOnceFileListFilter onlyFilesWeHaveNotSeenYet() {
        return new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "keyPrefix");
    }

    private AbstractFileListFilter<ChannelSftp.LsEntry> onlyTxtFiles() {
        return new AbstractFileListFilter<>() {
            @Override
            public boolean accept(ChannelSftp.LsEntry file) {
                return file.getFilename().endsWith(".txt");
            }
        };
    }

    private Function<Message<Object>, Object> logTheFilePath() {
        return message -> "### File path: " + message.getPayload();
    }

    private Consumer<HeaderEnricherSpec> addAMessageHeader() {
        return headers -> headers.headerExpression("fileToRemove", REMOTE_FOLDER_PATH_AS_EXPRESSION + " + payload");
    }

    private Function<Message<Object>, Object> logTheMessageHeaders() {
        return message -> "### Header file info: " + message.getHeaders();
    }

    private SftpOutboundGatewaySpec getTheFile(SessionFactory<ChannelSftp.LsEntry> sessionFactory) {
        return Sftp.outboundGateway(sessionFactory, AbstractRemoteFileOutboundGateway.Command.GET,
                        REMOTE_FOLDER_PATH_AS_EXPRESSION + " + payload")
                .options(AbstractRemoteFileOutboundGateway.Option.STREAM);
    }

    private FileSplitter splitContentIntoLines() {
        return new FileSplitter();
    }

    private Function<Message<Object>, Object> logTheFileContent() {
        return message -> "### File content line: '" + message.getPayload() + "'";
    }
}

编辑:请注意,这里有一个区别。另一个示例使用轮询器一次又一次地从“filesForMGET”生成带有远程文件路径的消息,并且该消息有效负载(文件路径)被用作“ls”的参数。我在这里硬编码它,忽略轮询器中的消息内容。

章宏峻
2023-03-14

它对我有效,这是我的完整代码

@Configuration
public class SftpIFConfig {

@InboundChannelAdapter(value = "sftpMgetInputChannel",
        poller = @Poller(fixedDelay = "5000"))
public String filesForMGET(){
    return "/upload/done";
}


@Bean
public IntegrationFlow sftpMGetFlow(SessionFactory<ChannelSftp.LsEntry> csf) {
    return IntegrationFlows.from("sftpMgetInputChannel")
            .handle(Sftp.outboundGateway(csf,
                            AbstractRemoteFileOutboundGateway.Command.LS, "payload")
                    .options(AbstractRemoteFileOutboundGateway.Option.RECURSIVE,  AbstractRemoteFileOutboundGateway.Option.NAME_ONLY)
                    //Persistent file list filter using the server's file timestamp to detect if we've already 'seen' this file.
                    .filter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "test")))
            .split()
            .log(message -> "file path -> "+message.getPayload())
            .enrichHeaders(headers -> headers.headerExpression("fileToRemove", "'/upload/done/' + payload"))
            .log(message -> "Heder file info -> "+message.getHeaders())
            .handle(Sftp.outboundGateway(csf, AbstractRemoteFileOutboundGateway.Command.GET, "'/upload/done/' + payload")
                    .options(AbstractRemoteFileOutboundGateway.Option.STREAM))
            .split(new FileSplitter())
            .log(message -> "File content -> "+message.getPayload())
            .get();
}

@Bean
CachingSessionFactory<ChannelSftp.LsEntry> csf(DefaultSftpSessionFactory sf) {
    return new CachingSessionFactory<>(sf);
}

@Bean
DefaultSftpSessionFactory sf() {
    DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory();
    factory.setHost("0.0.0.0");
    factory.setPort(2222);
    factory.setAllowUnknownKeys(true);
    factory.setUser("xxxx");
    factory.setPassword("xxx");
    return factory;
}
咸琪
2023-03-14

您可以使用两个出站网关-第一个执行ls-R(递归列表);拆分结果并使用配置有mget-stream的网关来获取每个文件。

编辑

@SpringBootApplication
public class So60987851Application {

    public static void main(String[] args) {
        SpringApplication.run(So60987851Application.class, args);
    }

    @Bean
    IntegrationFlow flow(SessionFactory<LsEntry> csf) {
        return IntegrationFlows.from(() -> "foo", e -> e.poller(Pollers.fixedDelay(5_000)))
                .handle(Sftp.outboundGateway(csf, Command.LS, "payload")
                        .options(Option.RECURSIVE, Option.NAME_ONLY)
                        // need a more robust metadata store for persistence, unless the files are removed
                        .filter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "test")))
                .split()
                .log()
                .enrichHeaders(headers -> headers.headerExpression("fileToRemove", "'foo/' + payload"))
                .handle(Sftp.outboundGateway(csf, Command.GET, "'foo/' + payload")
                        .options(Option.STREAM))
                .split(new FileSplitter())
                .log()
                // instead of a filter, we can remove the remote file.
                // but needs some logic to wait until all lines read
//              .handle(Sftp.outboundGateway(csf, Command.RM, "headers['fileToRemove']"))
//              .log()
                .get();
    }

    @Bean
    CachingSessionFactory<LsEntry> csf(DefaultSftpSessionFactory sf) {
        return new CachingSessionFactory<>(sf);
    }

    @Bean
    DefaultSftpSessionFactory sf() {
        DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
        sf.setHost("10.0.0.8");
        sf.setUser("gpr");
        sf.setPrivateKey(new FileSystemResource(new File("/Users/grussell/.ssh/id_rsa")));
        sf.setAllowUnknownKeys(true);
        return sf;
    }

}
 类似资料:
  • 我正在使用spring集成从远程sftp服务器中提取文件。一个要求是,在将文件从远程服务器目录拉入本地目录后,需要将拉入文件的副本归档到同一个sftp服务器上的不同目录中。 我在这里检查了不同的选项。本地有文件同步或文件复制选项。但没有找到远程的选项。 我应该尝试Spring集成以外的其他东西吗?我使用Spring集成是因为它已经是应用程序的一部分。这是一个新要求。

  • 我在Spring Boot应用程序中使用Spring集成版本5.4.4。我需要从“server_sftp”目录下的子目录中获取所有的XML文件。为此,我使用带有mget命令的SFTP流入站通道适配器和SFTP出站网关。不幸的是,应用程序只从根目录(server_sftp)下载文件,而不从子目录下载文件。 我哪里出错了? sftp服务器上的目录结构

  • 我正在尝试实现一个Spring Integration类,它获取一个. xml文件并对其进行解析,如果有效,就将其移动到一个“存档”目录,如果无效,就将其移动到一个错误目录。 然而,每当调用< code>calback.execute()时,我都会得到这个错误,我不太明白为什么。 虽然我有一个消息处理程序,但我怀疑这个问题的原因是我没有重写handle方法。但我不知道该怎么做。

  • 问题内容: 我想在子目录/超级目录中执行脚本(我需要先在此子目录/超级目录中)。我无法进入子目录: Python抛出OSError,我不知道为什么。无论是尝试进入现有的子目录还是进入一个目录(如上所述)都没有关系-我总是会遇到相同的错误。 问题答案: 您的代码尝试执行的操作是调用名为的程序。您想要的是称为的命令。 但是是外壳内部的。所以你只能称它为 但这是没有意义的。 由于没有进程可以更改另一个进

  • 在阅读的API时,我错过了很多函数。首先,它建议使用for循环从stream转到。而且我忽略了一个事实,即不是。 如何在Java8中从生成?

  • 我想把文件从远程服务器拉到我的本地服务器。我正在使用Spring Integration SFTP来拉文件。我有以下配置: 我不想在成功拉取后从remotedir中删除文件。这里发生的事情是每30秒,从remotedir的文件就会被拉到我的本地文件。我希望spring batch程序在第一次之后停止,不要再次拉出相同的文件。我怎样才能做到这一点?