当前位置: 首页 > 知识库问答 >
问题:

SFTP Spring集成中的文件轮询和过滤问题

壤驷敏学
2023-03-14

我正在尝试通过SFTP Spring集成拉取远程文件并将其转储到本地驱动器中,并启动Spring批处理作业(步骤1读取、处理和写入),接下来是步骤2,其中一个Tasklet将下载到本地另一个位置的本地文件存档并删除下载的文件。

假设下载的文件是“data.service”、“data.maintenance”和“data.transaction”。我只处理“data.service”,但所有3个都已存档并删除。

假设一组新的文件将在远程位置被覆盖,我希望以上步骤以特定的时间间隔发生。

<bean id="acceptAllFileListFilter" class="org.springframework.integration.file.filters.AcceptAllFileListFilter"/>
<bean id="acceptOnceFileListFilter" class="org.springframework.integration.file.filters.AcceptOnceFileListFilter"/>

conf

<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
        channel="inboundFileChannel"
        session-factory="sftpSessionFactory"
        local-directory="file:${inbound.local.directory}"
        remote-directory="${inbound.remote.directory}"
        auto-create-local-directory="true"
        delete-remote-files="false"
        preserve-timestamp="true"
        filename-pattern="*.*"
        local-filter="acceptAllFileListFilter" >
    <int:poller max-messages-per-poll="-1" fixed-rate="60000" />
</int-sftp:inbound-channel-adapter>

现在发生的是...上面的操作触发了这个序列,并在一个无休止的循环中运行,而不需要从远程获取新文件。详情如下:

我从一个空的本地文件夹开始,远程服务器有所有3个文件[开始程序]

    null
        local-filter="acceptOnceFileListFilter" >
    null

对于不同职位的建议,我有一个独特的工作参数,如下所示:

@Transformer
public JobLaunchRequest toRequest(Message<File> message) {
    JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
    jobParametersBuilder.addString(fileParameterName, message.getPayload().getAbsolutePath());
    jobParametersBuilder.addLong("currentTime", new Long(System.currentTimeMillis()));
    return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
}

尝试了不同的过滤器组合,没有一个达到我的目的。任何样本都会有很大的帮助,从上周开始就在努力解决这个问题。

以下是我根据建议所做的修改:

<bean id="compositeFilter" class="org.springframework.integration.file.filters.CompositeFileListFilter">
    <constructor-arg>
        <list>
            <bean class="org.springframework.integration.file.filters.FileSystemPersistentAcceptOnceFileListFilter"/>
            <bean class="org.springframework.integration.sftp.filters.SftpPersistentAcceptOnceFileListFilter">
            </bean>                     
        </list>
    </constructor-arg>
</bean>
<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
        channel="inboundFileChannel"
        session-factory="sftpSessionFactory"
        local-directory="file:${inbound.local.directory}"
        remote-directory="${inbound.remote.directory}"
        auto-create-local-directory="true"
        delete-remote-files="false"  
        filter="compositeFilter">
    <int:poller max-messages-per-poll="-1" fixed-rate="600000" />
</int-sftp:inbound-channel-adapter>
<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
        channel="inboundFileChannel"
        session-factory="sftpSessionFactory"
        local-directory="file:${inbound.local.directory}"
        remote-directory="file:${inbound.remote.directory}"
        auto-create-local-directory="false"
        delete-remote-files="false"  
        filter="compositeFilter"
        >
    <int:poller max-messages-per-poll="-1" fixed-rate="60000" />
</int-sftp:inbound-channel-adapter>

<bean id="compositeFilter" class="org.springframework.integration.file.filters.CompositeFileListFilter">
    <constructor-arg>
        <list>
            <bean class="org.springframework.integration.file.filters.FileSystemPersistentAcceptOnceFileListFilter">
                <constructor-arg name="store" ref="metadataStore"/>
                <constructor-arg value="*.*"/>
            </bean>
            <bean class="org.springframework.integration.sftp.filters.SftpPersistentAcceptOnceFileListFilter">
                <constructor-arg name="store" ref="metadataStore"/>
                <constructor-arg value="*.*"/>
            </bean>
        </list>
    </constructor-arg>
</bean>

<bean id="redisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
    <property name="port" value="7379"/>
</bean>

<bean name="metadataStore" class="org.springframework.integration.redis.metadata.RedisMetadataStore">
    <constructor-arg name="connectionFactory" ref="redisConnectionFactory"/>
</bean>

2014-07-17 22:27:14,199[task-scheduler-1]调试org.SpringFramework.Integration.channel.PublishSubscribeChannel-postSend(SENT=TRUE)对通道“错误通道”,消息:[Payload MessagingException Content=org.SpringFramework.Messaging.MessagingException:将html" target="_blank">远程目录同步到本地目录时出现问题][Headers={ID=84CCCECF-20EC-F67B-3F74-A938BF9ABF3D,TimeStamp=1405661234197}]

删除FileSystemPersistentAcceptonCeFileListFilter筛选器离开“SFTPPersistentAcceptonCeFileListFilter”后更新

2014-07-18 09:29:22,069[task-scheduler-3]调试org.SpringFramework.Integration.channel.PublishSubscribeChannel-postSend(SENT=TRUE)对通道“错误通道”,消息:[Payload MessagingException Content=org.SpringFramework.Messaging.MessagingException:将远程目录同步到本地目录时出现问题][Headers={ID=55304E38-6F82-8350-E763-0F5BCB507788,TimeStamp=1405700962068}]

共有1个答案

龙永思
2023-03-14

可能是这样的情况:如果丢弃在服务器上的新文件具有相同的名称(在第一次迭代中下载),那么spring integration将不会再次尝试下载这些文件。“AcceptonCEFileListFilter”似乎会限制sftp集成组件再次下载同一文件。

更新:这里是一些示例代码,它每10秒轮询一次远程目录,而不引入任何过滤器(我不知道为什么您需要它们,因为您想下载文件)。如果您不想更改文件名,您必须确保的唯一一件事是,当下一个文件被丢弃在远程服务器上时,您下载的本地文件需要删除。我验证了它,对我来说工作得很好。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns:int="http://www.springframework.org/schema/integration"
   xmlns:int-ftp="http://www.springframework.org/schema/integration/ftp"
   xsi:schemaLocation="http://www.springframework.org/schema/beans      http://www.springframework.org/schema/beans/spring-beans.xsd
   http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd

   http://www.springframework.org/schema/integration/ftp http://www.springframework.org/schema/integration/ftp/spring-integration-ftp.xsd">


<int:channel id="incomingChannel"/>

<bean id="ftpClientFactory" class="org.springframework.integration.ftp.session.DefaultFtpSessionFactory">
    <property name="host" value="host"/>
    <property name="username" value="username"/>
    <property name="password" value="password"/>
</bean>

<!-- poll every 10 secs without worrying about file names-->

<int-ftp:inbound-channel-adapter id="triggerFtpInBound"
                                 channel="incomingChannel"
                                 auto-create-local-directory="true"
                                 local-directory="C:\home\temp\ftp"
                                 remote-directory="/export/home/rbaljinder/ftp-test"
                                 filename-pattern="*.*"
                                 session-factory="ftpClientFactory">
    <int:poller cron="1/10 * * * * *" max-messages-per-poll="1"/>
</int-ftp:inbound-channel-adapter>

更新:现在似乎起作用了。added local-filter=“AcceptAllFileListFilter”。

 <int-ftp:inbound-channel-adapter id="triggerFtpInBound"
                                 channel="incomingChannel"
                                 auto-create-local-directory="true"
                                 local-directory="C:\home\temp\ftp"
                                 remote-directory="/export/home/cwk2/ftp-test"
                                 filename-pattern="*.*"
                                 session-factory="ftpClientFactory"
                                 local-filter="acceptAllFileListFilter">
    <int:poller cron="1/10 * * * * *" max-messages-per-poll="1"/>
</int-ftp:inbound-channel-adapter>
<bean id="acceptAllFileListFilter"
      class="org.springframework.integration.file.filters.AcceptAllFileListFilter"/>
<int:service-activator id="jobServiceActivator"
                       input-channel="incomingChannel"
                       ref="triggerJobLauncher"
                       method="launch"/>

@Component("triggerJobLauncher")
public static class TriggerJobLauncher {

    @Autowired
    JobLauncher jobLauncher;

    public void launch(File file) throws Exception {
        System.out.println("test:" + file);
    }

}
 类似资料:
  • 如何使用Spring集成Api轮询特定文件目录中的多个文件,而无需xml配置,最好使用基于Java注释的方法?我想获得轮询文件列表,并对其进行迭代并进一步处理。这是要求。可用于满足此要求的任何示例代码。提前谢谢。下面是我使用的代码片段。 但是不管输入目录中没有文件,消息源实例总是只获取一个文件。不确定如何使其适用于要获取的多个文件。

  • 我正在使用下面的测试用例运行这个程序: 我得到这个错误: 当我输入这个问题时,我尝试了谷歌关于这个问题的大多数顶级答案和StachOverflow的建议,所有这些都导致了不同的其他错误,我似乎从主要问题上转移了注意力。 最常见的建议是添加一个全局轮询器,但这会导致以下错误: 原因:java.lang.IllegalArgumentException:不应为终结点“org.SpringFramewo

  • 我阅读了Spring集成指南和这里的示例,获得了Spring集成SFTP程序的工作示例。我已经有了一个可以工作的Spring批处理程序,它可以读取大量文件并转储到数据库中。 我现在正试图通过查看Spring文档来集成Spring批处理和Spring集成程序,我创建了以下配置。 我使用下面的测试用例运行这个程序: 我得到了这个错误: 在我输入这个问题时,我尝试了谷歌关于这个问题的大多数顶级答案和St

  • 如何使用java dsl Integrationflows从spring集成触发spring批处理作业。 我有下面的代码,它轮询目录中的文件,当新文件添加到目录中时,会生成一条消息,我想在该实例中触发一个Spring批处理作业。请建议。

  • 问题内容: 我想问一下是否有可能使用hibernate方式做到这一点。假设我已经运行了HQL并检索了一个集合。是否可以使用hibernate进一步过滤它? 我试图在标头类中使用,并在查询之前添加session.enable(),但似乎无法正常工作。 样例代码 精简HQL 问题答案: 不。至少不是您问的方式。一旦您要求Hibernate(使用方法)访问数据库,Hibernate就发挥了作用,结果现在

  • 问题内容: 我正在寻找一种包含/排除文件模式并从呼叫中排除目录的方法。 这是我现在正在做的事情: 有一个更好的方法吗?怎么样? 问题答案: 此解决方案用于将glob模式转换为正则表达式(假定include仅用于文件):