当前位置: 首页 > 知识库问答 >
问题:

如何使用spring集成sftp出站网关从远程服务器下载文件?

巫马盛
2023-03-14

我有spring批处理设置(远程分区),它从文件中读取项目并处理它们。

null

public class PrepareExchangeListListener implements StepExecutionListener {

    private String localDir;
    private String remoteDir;

    private DirectChannel requestChannel;
    private PollableChannel replyChannel;

    public String getLocalDir() {
        return localDir;
    }

    public void setLocalDir(String localDir) {
        this.localDir = localDir;
    }

    public String getRemoteDir() {
        return remoteDir;
    }

    public void setRemoteDir(String remoteDir) {
        this.remoteDir = remoteDir;
    }

    public DirectChannel getRequestChannel() {
        return requestChannel;
    }

    public void setRequestChannel(DirectChannel requestChannel) {
        this.requestChannel = requestChannel;
    }

    public PollableChannel getReplyChannel() {
        return replyChannel;
    }

    public void setReplyChannel(PollableChannel replyChannel) {
        this.replyChannel = replyChannel;
    }

    @Override
    public void beforeStep(StepExecution stepExecution) {
        // TODO Auto-generated method stub

        System.out.println("Listener triggered.");

        this.requestChannel.send(new GenericMessage<Object>(this.remoteDir
                + "/" + "*"));

        Message<?> result = this.replyChannel.receive(100000);

        List<File> localFiles = (List<File>) result.getPayload();

        for (File file : localFiles) {

            System.out.println(file.getName());

        }

    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {

        return null;
    }

}

我指的是这个测试用例

https://github.com/spring-projects/spring-integration/blob/master/spring-integration-sftp/src/test/Java/org/springframework/integration/sftp/outbound/sftpoutboundtests.Java

https://github.com/spring-projects/spring-integration/blob/master/spring-integration-sftp/src/test/Java/org/springframework/integration/sftp/outbound/sftpserveroutboundtests-context.xml

下面是我的配置,

<rabbit:template id="importExchangesAmqpTemplate"
    connection-factory="rabbitConnectionFactory" routing-key="importExchangesQueue"
    reply-timeout="${import.exchanges.partition.timeout}">
</rabbit:template>

<int:channel id="importExchangesOutboundChannel">
    <int:dispatcher task-executor="taskExecutor" />
</int:channel>

<int:channel id="importExchangesInboundStagingChannel" />

<amqp:outbound-gateway request-channel="importExchangesOutboundChannel"
    reply-channel="importExchangesInboundStagingChannel" amqp-template="importExchangesAmqpTemplate"
    mapped-request-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS"
    mapped-reply-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS" />


<beans:bean id="importExchangesMessagingTemplate"
    class="org.springframework.integration.core.MessagingTemplate"
    p:defaultChannel-ref="importExchangesOutboundChannel"
    p:receiveTimeout="${import.exchanges.partition.timeout}" />


<beans:bean id="importExchangesPartitionHandler"
    class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler"
    p:stepName="importExchangesStep" p:gridSize="${import.exchanges.grid.size}"
    p:messagingOperations-ref="importExchangesMessagingTemplate" />

<int:aggregator ref="importExchangesPartitionHandler"
    send-partial-result-on-expiry="true" send-timeout="${import.exchanges.step.timeout}"
    input-channel="importExchangesInboundStagingChannel" />

<amqp:inbound-gateway concurrent-consumers="${import.exchanges.consumer.concurrency}"
    request-channel="importExchangesInboundChannel" reply-channel="importExchangesOutboundStagingChannel"
    queue-names="importExchangesQueue" connection-factory="rabbitConnectionFactory"
    mapped-request-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS"
    mapped-reply-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS" />


<int:channel id="importExchangesInboundChannel" />

<int:service-activator ref="stepExecutionRequestHandler"
    input-channel="importExchangesInboundChannel" output-channel="importExchangesOutboundStagingChannel" />

<int:channel id="importExchangesOutboundStagingChannel" />


<beans:bean id="importExchangesItemWriter"
    class="com.st.batch.foundation.writers.ImportExchangesAndEclsItemWriter"
    p:symfony-ref="symfonyStepScoped" p:timeout="${import.exchanges.item.timeout}"
    scope="step" />

<beans:bean id="importExchangesPartitioner"
    class="org.springframework.batch.core.partition.support.MultiResourcePartitioner"
    p:resources="file:${spring.tmp.batch.dir}/#{jobParameters[batch_id]}/exchanges/exchanges_*.txt"
    scope="step" />

<beans:bean id="importExchangesFileItemReader"
    class="org.springframework.batch.item.file.FlatFileItemReader"
    p:resource="#{stepExecutionContext[fileName]}" p:lineMapper-ref="stLineMapper"
    scope="step" />




<beans:bean id="prepareExchangeListListener"
    class="com.st.batch.listeners.PrepareExchangeListListener"
    p:requestChannel-ref="inboundMGetRecursiveFiltered" p:localDir="/tmp/spring/batch"
    p:replyChannel-ref="outputSftp" p:remoteDir="/tmp/spring/batch" />

<beans:bean id="ftpSessionFactory"
    class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
    <beans:property name="host" value="myRemoteServer" />
    <beans:property name="user" value="username" />
    <beans:property name="password" value="password" />
</beans:bean>

<int:channel id="outputSftp">
    <int:queue />
</int:channel>


<int:channel id="inboundMGetRecursiveFiltered" />

<int-sftp:outbound-gateway session-factory="ftpSessionFactory"
    request-channel="inboundMGetRecursiveFiltered" command="mget"
    expression="payload" command-options="-R" local-directory="/tmp/spring/batch"
    reply-channel="outputSftp" />


<step id="importExchangesStep">
    <tasklet transaction-manager="transactionManager">
        <chunk reader="importExchangesFileItemReader" writer="importExchangesItemWriter"
            commit-interval="${import.exchanges.commit.interval}" />
    <listeners>
        <listener ref="prepareExchangeListListener" />
    </listeners>


    </tasklet>

</step>

<job id="importExchangesJob" restartable="true">
    <step id="importExchangesStep.master">
        <partition partitioner="importExchangesPartitioner"
            handler="importExchangesPartitionHandler" />

    <listeners>
        <listener ref="prepareExchangeListListener" />
    </listeners>
    </step>
</job>

我是不是做错了什么,好像什么都不起作用。

另外,int-sftp:outbound-gateway可以有scope=“step”吗?如果我尝试添加,它给出了STS中的错误。

日志

下面是启用了调试级别的日志

19:42:18,329  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - Connecting to SERVER_IP port 22
19:42:18,473  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - Connection established
19:42:18,623  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - Remote version string: SSH-2.0-OpenSSH_5.8p1 Debian-7ubuntu1
19:42:18,623  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - Local version string: SSH-2.0-JSCH-0.1.45
19:42:18,624  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - CheckCiphers: aes256-ctr,aes192-ctr,aes128-ctr,aes256-cbc,aes192-cbc,aes128-cbc,3des-ctr,arcfour,arcfour128,arcfour256
19:42:18,668  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - aes256-ctr is not available.
19:42:18,668  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - aes192-ctr is not available.
19:42:18,668  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - aes256-cbc is not available.
19:42:18,669  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - aes192-cbc is not available.
19:42:18,669  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - arcfour256 is not available.
19:42:18,669  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - CheckKexes: diffie-hellman-group14-sha1
19:42:18,673  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - diffie-hellman-group14-sha1 is not available.
19:42:18,673  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - SSH_MSG_KEXINIT sent
19:42:18,767  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - SSH_MSG_KEXINIT received
19:42:18,767  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - kex: server->client aes128-ctr hmac-md5 none
19:42:18,767  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - kex: client->server aes128-ctr hmac-md5 none
19:42:18,778  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - SSH_MSG_KEXDH_INIT sent
19:42:18,778  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - expecting SSH_MSG_KEXDH_REPLY
19:42:18,935  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - ssh_rsa_verify: signature true
19:42:18,938  WARN jobLauncherTaskExecutor-1 jcraft.jsch:55 - Permanently added 'SERVER_IP' (RSA) to the list of known hosts.
19:42:18,939  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - SSH_MSG_NEWKEYS sent
19:42:18,939  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - SSH_MSG_NEWKEYS received
19:42:18,945  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - SSH_MSG_SERVICE_REQUEST sent
19:42:19,000 DEBUG task-scheduler-3 endpoint.SourcePollingChannelAdapter:71 - Received no Message during the poll, returning 'false'
19:42:19,088  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - SSH_MSG_SERVICE_ACCEPT received
19:42:19,918  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - Authentications that can continue: publickey,keyboard-interactive,password
19:42:19,918  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - Next authentication method: publickey
19:42:19,919  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - Authentications that can continue: password
19:42:19,920  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - Next authentication method: password
19:42:20,001 DEBUG task-scheduler-3 endpoint.SourcePollingChannelAdapter:71 - Received no Message during the poll, returning 'false'
19:42:20,084  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - Authentication succeeded (password).
19:42:20,542 DEBUG jobLauncherTaskExecutor-1 util.SimplePool:184 - Obtained new org.springframework.integration.sftp.session.SftpSession@7d4d6823.
19:42:21,001 DEBUG task-scheduler-3 endpoint.SourcePollingChannelAdapter:71 - Received no Message during the poll, returning 'false'
19:42:21,134 DEBUG jobLauncherTaskExecutor-1 session.CachingSessionFactory:109 - Releasing Session back to the pool.
19:42:21,134 DEBUG jobLauncherTaskExecutor-1 util.SimplePool:215 - Releasing org.springframework.integration.sftp.session.SftpSession@7d4d6823 back to the pool
19:42:21,134 DEBUG jobLauncherTaskExecutor-1 gateway.SftpOutboundGateway:197 - handler 'org.springframework.integration.sftp.gateway.SftpOutboundGateway#0' sending reply Message: [Payload=[]][Headers={timestamp=1403532741134, id=e4f6e7b3-b8bf-4364-99b4-ca889e5aa7fa, file_remoteDirectory=/tmp/spring/batch/, file_remoteFile=*}]
19:42:21,135 DEBUG jobLauncherTaskExecutor-1 channel.QueueChannel:224 - preSend on channel 'outputSftp', message: [Payload=[]][Headers={timestamp=1403532741134, id=e4f6e7b3-b8bf-4364-99b4-ca889e5aa7fa, file_remoteDirectory=/tmp/spring/batch/, file_remoteFile=*}]
19:42:21,135 DEBUG jobLauncherTaskExecutor-1 channel.QueueChannel:237 - postSend (sent=true) on channel 'outputSftp', message: [Payload=[]][Headers={timestamp=1403532741134, id=e4f6e7b3-b8bf-4364-99b4-ca889e5aa7fa, file_remoteDirectory=/tmp/spring/batch/, file_remoteFile=*}]
19:42:21,135 DEBUG jobLauncherTaskExecutor-1 channel.DirectChannel:237 - postSend (sent=true) on channel 'inboundMGetRecursiveFiltered', message: [Payload=/tmp/spring/batch/*][Headers={timestamp=1403532738315, id=334c6d05-be25-4a9b-b9d8-773103da1cfd, file_remoteDirectory=/tmp/spring/batch, file_remoteFile=*.txt}]
19:42:21,135 DEBUG jobLauncherTaskExecutor-1 channel.QueueChannel:258 - postReceive on channel 'outputSftp', message: [Payload=[]][Headers={timestamp=1403532741134, id=e4f6e7b3-b8bf-4364-99b4-ca889e5aa7fa, file_remoteDirectory=/tmp/spring/batch/, file_remoteFile=*}]
19:42:21,193 DEBUG jobLauncherTaskExecutor-1 scope.StepScope:108 - Creating object in scope=step, name=scopedTarget.importExchangesPartitioner
19:42:21,193 DEBUG jobLauncherTaskExecutor-1 support.DefaultListableBeanFactory:435 - Creating instance of bean 'scopedTarget.importExchangesPartitioner'
19:42:21,204 DEBUG jobLauncherTaskExecutor-1 scope.StepScope:136 - Registered destruction callback in scope=step, name=scopedTarget.importExchangesPartitioner
19:42:21,204 DEBUG jobLauncherTaskExecutor-1 support.DefaultListableBeanFactory:463 - Finished creating instance of bean 'scopedTarget.importExchangesPartitioner'
19:42:22,000 DEBUG task-scheduler-3 endpoint.SourcePollingChannelAdapter:71 - Received no Message during the poll, returning 'false'
19:42:23,000 DEBUG task-scheduler-3 endpoint.SourcePollingChannelAdapter:71 - Received no Message during the poll, returning 'false'
19:42:24,001 DEBUG task-scheduler-3 endpoint.SourcePollingChannelAdapter:71 - Received no Message during the poll, returning 'false'
19:42:25,000 DEBUG task-scheduler-3 endpoint.SourcePollingChannelAdapter:71 - Received no Message during the poll, returning 'false'
19:42:26,000 DEBUG task-scheduler-3 endpoint.SourcePollingChannelAdapter:71 - Received no Message during the poll, returning 'false'
19:42:27,000 DEBUG task-scheduler-3 endpoint.SourcePollingChannelAdapter:71 - Received no Message during the poll, returning 'false'

此处为完整日志

http://pastebin.com/4kdbgqch

更新:

下面只是一个简单的测试而不是整个用例,

测试作业

<beans:bean id="testStepTasklet"
    class="com.sta.batch.foundation.tasklets.TestTasklet"
    p:requestChannel-ref="inboundMGetRecursive" p:replyChannel-ref="output" />

<job id="testJob">
    <step id="testStep">
        <tasklet ref="testStepTasklet" />
    </step>
</job>

sftp出站网关配置

<beans:bean id="sftpSessionFactory"
    class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
    <beans:property name="host" value="MY_SERVER_ID" />
    <beans:property name="user" value="myUser" />
    <beans:property name="password" value="myPassword" />
</beans:bean>


<int:channel id="output">
    <int:queue />
</int:channel>

<int:channel id="inboundMGetRecursive" />

<int-sftp:outbound-gateway session-factory="sftpSessionFactory"
    request-channel="inboundMGetRecursive" command="mget" expression="payload"
    command-options="-R" local-directory="/tmp/spring/batch/"
    reply-channel="output" />

测试任务

public class TestTasklet implements Tasklet, InitializingBean {

    private DirectChannel requestChannel;
    private PollableChannel replyChannel;

    public DirectChannel getRequestChannel() {
        return requestChannel;
    }

    public void setRequestChannel(DirectChannel requestChannel) {
        this.requestChannel = requestChannel;
    }

    public PollableChannel getReplyChannel() {
        return replyChannel;
    }

    public void setReplyChannel(PollableChannel replyChannel) {
        this.replyChannel = replyChannel;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        // TODO Auto-generated method stub

    }

    @SuppressWarnings("unchecked")
    @Override
    public RepeatStatus execute(StepContribution contribution,
            ChunkContext chunkContext) throws Exception {

        String dir = "/tmp/spring/batch/";
        this.requestChannel.send(new GenericMessage<Object>(dir + "*"));
        Message<?> result = this.replyChannel.receive(1000);

        List<File> localFiles = (List<File>) result.getPayload();

        for (File file : localFiles) {
            System.out.println(file.getName());
        }

        return RepeatStatus.FINISHED;

    }

}

日志:

12:30:50,017 DEBUG jobLauncherTaskExecutor-1 context.StepContextRepeatCallback:76 - Chunk execution starting: queue size=0
12:30:50,025 DEBUG jobLauncherTaskExecutor-1 channel.DirectChannel:224 - preSend on channel 'inboundMGetRecursive', message: [Payload=/tmp/spring/batch/*][Headers={timestamp=1403593250024, id=7080f48d-6b7d-4d31-b0b8-b3384dc93e53}]
12:30:50,025 DEBUG jobLauncherTaskExecutor-1 gateway.SftpOutboundGateway:67 - org.springframework.integration.sftp.gateway.SftpOutboundGateway#0 received message: [Payload=/tmp/spring/batch/*][Headers={timestamp=1403593250024, id=7080f48d-6b7d-4d31-b0b8-b3384dc93e53}]
12:30:50,038  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - Connecting to SERVER_IP port 22
12:30:50,181  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - Connection established
12:30:50,332  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - Remote version string: SSH-2.0-OpenSSH_5.8p1 Debian-7ubuntu1
12:30:50,333  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - Local version string: SSH-2.0-JSCH-0.1.45
12:30:50,333  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - CheckCiphers: aes256-ctr,aes192-ctr,aes128-ctr,aes256-cbc,aes192-cbc,aes128-cbc,3des-ctr,arcfour,arcfour128,arcfour256
12:30:50,376  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - aes256-ctr is not available.
12:30:50,377  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - aes192-ctr is not available.
12:30:50,377  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - aes256-cbc is not available.
12:30:50,377  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - aes192-cbc is not available.
12:30:50,377  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - arcfour256 is not available.
12:30:50,377  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - CheckKexes: diffie-hellman-group14-sha1
12:30:50,381  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - diffie-hellman-group14-sha1 is not available.
12:30:50,382  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - SSH_MSG_KEXINIT sent
12:30:50,477  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - SSH_MSG_KEXINIT received
12:30:50,477  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - kex: server->client aes128-ctr hmac-md5 none
12:30:50,477  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - kex: client->server aes128-ctr hmac-md5 none
12:30:50,489  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - SSH_MSG_KEXDH_INIT sent
12:30:50,489  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - expecting SSH_MSG_KEXDH_REPLY
12:30:50,645  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - ssh_rsa_verify: signature true
12:30:50,647  WARN jobLauncherTaskExecutor-1 jcraft.jsch:55 - Permanently added 'SERVER_IP' (RSA) to the list of known hosts.
12:30:50,648  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - SSH_MSG_NEWKEYS sent
12:30:50,648  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - SSH_MSG_NEWKEYS received
12:30:50,651  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - SSH_MSG_SERVICE_REQUEST sent
12:30:50,794  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - SSH_MSG_SERVICE_ACCEPT received
12:30:50,964  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - Authentications that can continue: publickey,keyboard-interactive,password
12:30:50,964  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - Next authentication method: publickey
12:30:50,966  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - Authentications that can continue: password
12:30:50,966  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - Next authentication method: password
12:30:51,001 DEBUG task-scheduler-3 endpoint.SourcePollingChannelAdapter:71 - Received no Message during the poll, returning 'false'
12:30:51,126  INFO jobLauncherTaskExecutor-1 jcraft.jsch:52 - Authentication succeeded (password).
12:30:51,581 DEBUG jobLauncherTaskExecutor-1 util.SimplePool:184 - Obtained new org.springframework.integration.sftp.session.SftpSession@329c5e61.
12:30:52,000 DEBUG task-scheduler-3 endpoint.SourcePollingChannelAdapter:71 - Received no Message during the poll, returning 'false'
12:30:52,160 DEBUG jobLauncherTaskExecutor-1 session.CachingSessionFactory:109 - Releasing Session back to the pool.
12:30:52,160 DEBUG jobLauncherTaskExecutor-1 util.SimplePool:215 - Releasing org.springframework.integration.sftp.session.SftpSession@329c5e61 back to the pool
12:30:52,161 DEBUG jobLauncherTaskExecutor-1 gateway.SftpOutboundGateway:197 - handler 'org.springframework.integration.sftp.gateway.SftpOutboundGateway#0' sending reply Message: [Payload=[]][Headers={timestamp=1403593252160, id=00899cf1-5dd7-4192-b0ae-9812fe07283e, file_remoteDirectory=/tmp/spring/batch/, file_remoteFile=*}]
12:30:52,161 DEBUG jobLauncherTaskExecutor-1 channel.QueueChannel:224 - preSend on channel 'output', message: [Payload=[]][Headers={timestamp=1403593252160, id=00899cf1-5dd7-4192-b0ae-9812fe07283e, file_remoteDirectory=/tmp/spring/batch/, file_remoteFile=*}]
12:30:52,161 DEBUG jobLauncherTaskExecutor-1 channel.QueueChannel:237 - postSend (sent=true) on channel 'output', message: [Payload=[]][Headers={timestamp=1403593252160, id=00899cf1-5dd7-4192-b0ae-9812fe07283e, file_remoteDirectory=/tmp/spring/batch/, file_remoteFile=*}]
12:30:52,162 DEBUG jobLauncherTaskExecutor-1 channel.DirectChannel:237 - postSend (sent=true) on channel 'inboundMGetRecursive', message: [Payload=/tmp/spring/batch/*][Headers={timestamp=1403593250024, id=7080f48d-6b7d-4d31-b0b8-b3384dc93e53}]
12:30:52,162 DEBUG jobLauncherTaskExecutor-1 channel.QueueChannel:258 - postReceive on channel 'output', message: [Payload=[]][Headers={timestamp=1403593252160, id=00899cf1-5dd7-4192-b0ae-9812fe07283e, file_remoteDirectory=/tmp/spring/batch/, file_remoteFile=*}]
12:30:52,162 DEBUG jobLauncherTaskExecutor-1 tasklet.TaskletStep:433 - Applying contribution: [StepContribution: read=0, written=0, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING]
12:30:52,164 DEBUG jobLauncherTaskExecutor-1 tasklet.TaskletStep:447 - Saving step execution before commit: StepExecution: id=371, version=1, name=testStep, status=STARTED, exitStatus=EXECUTING, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription=
12:30:52,283 DEBUG jobLauncherTaskExecutor-1 support.RepeatTemplate:437 - Repeat is complete according to policy and result value.
12:30:52,284 DEBUG jobLauncherTaskExecutor-1 step.AbstractStep:212 - Step execution success: id=371
12:30:52,383 DEBUG jobLauncherTaskExecutor-1 step.AbstractStep:276 - Step execution complete: StepExecution: id=371, version=3, name=testStep, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0
12:30:52,432 DEBUG jobLauncherTaskExecutor-1 support.SimpleFlow:175 - Completed state=testJob.testStep with status=COMPLETED
12:30:52,433 DEBUG jobLauncherTaskExecutor-1 support.SimpleFlow:161 - Handling state=testJob.end1
12:30:52,433 DEBUG jobLauncherTaskExecutor-1 support.SimpleFlow:175 - Completed state=testJob.end1 with status=COMPLETED
12:30:52,434 DEBUG jobLauncherTaskExecutor-1 job.AbstractJob:305 - Job execution complete: JobExecution: id=91, version=1, startTime=Tue Jun 24 12:30:49 IST 2014, endTime=null, lastUpdated=Tue Jun 24 12:30:49 IST 2014, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=91, version=0, Job=[testJob]], jobParameters=[{b=6, batch_id=2014-06-14}]
12:30:52,457  INFO jobLauncherTaskExecutor-1 support.SimpleJobLauncher:136 - Job: [FlowJob: [name=testJob]] completed with the following parameters: [{b=6, batch_id=2014-06-14}] and the following status: [COMPLETED]
12:30:53,000 DEBUG task-scheduler-3 endpoint.SourcePollingChannelAdapter:71 - Received no Message during the poll, returning 'false'
12:30:54,000 DEBUG task-scheduler-3 endpoint.SourcePollingChannelAdapter:71 - Received no Message during the poll, returning 'false'
12:30:55,000 DEBUG task-scheduler-3 endpoint.SourcePollingChannelAdapter:71 - Received no Message during the poll, returning 'false'
12:30:56,001 DEBUG task-scheduler-3 endpoint.SourcePollingChannelAdapter:71 - Received no Message during the poll, returning 'false'
12:30:57,000 DEBUG task-scheduler-3 endpoint.SourcePollingChannelAdapter:71 - Received no Message during the poll, returning 'false'

共有1个答案

莘昊
2023-03-14

我建议您将事情分解成可管理的块--让SFTP的东西在独立测试中工作;然后把它组装到你的工作中。

打开调试日志以查看网关发生了什么。

不,您不能将spring集成组件放入step范围,您必须以编程方式或在子上下文中创建它,如(动态ftp示例)(https://github.com/spring-projects/-integration-samples/tree/master/advanced/dynamic-ftp)所示。

 类似资料:
  • 我有一个要求,我想使用spring集成,但不想使用spring boot从sftp服务器下载文件。我在这里使用jcraft库。希望使用spring集成库。

  • 问题内容: 我正在使用jsch从服务器下载文件,下面是我的代码。 com.jcraft.jsch.ChannelSftp.throwStatusError(ChannelSftp.java:2629) at com.jcraft.jsch.ChannelSftp._get(ChannelSftp.java:977) at com.jcraft.jsch.ChannelSftp.get(Channe

  • 我想把文件从远程服务器拉到我的本地服务器。我正在使用Spring Integration SFTP来拉文件。我有以下配置: 我不想在成功拉取后从remotedir中删除文件。这里发生的事情是每30秒,从remotedir的文件就会被拉到我的本地文件。我希望spring batch程序在第一次之后停止,不要再次拉出相同的文件。我怎样才能做到这一点?

  • 我当前的项目是基于Spring集成的。我正在使用spring Boot开发这个项目。 我的目标是使用Spring Integr来完成以下任务。 > 连接到SFTP 检查是否在本地特定文件夹中创建了目录 检查特定于(CSV的文件的合格文件扩展名 将所有内容从SFTP远程目录下载到本地目录 逐行读取本地目录中的文件,并提取特定的列信息。 你能给我一些建议吗? 我怎样才能得到换乘开始时间?注意:我必须将

  • 我想从batch tasklet中调用sftp:outbound gateway,以便从sftp服务器下载文件。我看过其他与这个主题相关的帖子,但我不确定我做错了什么。有人能根据我的配置给我一个提示吗?我的批处理工作正常,所以问题只是在批处理步骤中调用sftp组件。我用注释标记了Spring集成部分,这样就更容易阅读相关配置。 我可以在日志中看到:调试[o.s.I.e.SourcePollingC

  • 问题内容: 我想编写连接到我的大学SFTP服务器的脚本,并通过练习下载最新文件。到目前为止,我已经从Paramiko示例中更改了一些代码,但是我不知道如何下载最新文件。 这是我的代码: 问题答案: 使用而不是来获得具有属性(包括文件时间戳记)的列表。 然后,找到具有最大属性的文件条目。 代码如下: