当前位置: 首页 > 知识库问答 >
问题:

如何从spring批处理调用sftp:出站网关?

南宫才艺
2023-03-14

我想从batch tasklet中调用sftp:outbound gateway,以便从sftp服务器下载文件。我看过其他与这个主题相关的帖子,但我不确定我做错了什么。有人能根据我的配置给我一个提示吗?我的批处理工作正常,所以问题只是在批处理步骤中调用sftp组件。我用注释标记了Spring集成部分,这样就更容易阅读相关配置。

我可以在日志中看到:调试[o.s.I.e.SourcePollingChannelAdapter]在轮询期间未收到任何消息,返回“false”。所以我没有收到文件,但为什么?

提前感谢您花时间进行分析!

<bean id="ftsSftpClientFactory" class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
    <property name="host" value="${my.import.sftp.localhost}"/>
    <property name="user" value="${my.import.sftp.username}"/>
    <property name="password" value="${my.import.sftp.passwort}"/>
</bean>

  <!-- Start: Spring Integration -->

    <int:channel id="replyChannel" >
        <int:queue/>
    </int:channel>

    <int:channel id="requestChannel" />

    <int-sftp:outbound-gateway id="sftpGateway"
                               session-factory="ftsSftpClientFactory"
                               request-channel="requestChannel"
                               reply-channel="replyChannel"
                               auto-startup="true"
                               command="get"
                               command-options="-P"
                               expression="payload"
                               remote-directory="."
                               local-directory="${my.import.sftp.copy.file.destinationpath}">
    </int-sftp:outbound-gateway>

<bean name="copyFileTasklet" class="com.mydomain.CopyFileTasklet">
    <property name="channel" ref="replyChannel" />
    <property name="pollableChannel" ref="requestChannel" />
</bean>
<!-- Start: Spring Batch -->
<bean name="myImportTask" class="com.mydomain.MyImportTask">
    <property name="job" ref="unternehmungImportJob"/>
    <property name="jobLauncher" ref="jobLauncher"/>
</bean>

<bean id="jobDetail"
      class="com.mydomain.MyImportJob">
    <property name="myImportTask" ref="myImportTask" />
</bean>

<!--suppress SpringBatchModel -->
<batch:job id="myImportJob">
    <batch:step id="copy-file-step" next="my-import-step">
        <batch:tasklet ref="copyFileTasklet"/>
    </batch:step>
    <batch:step id="my-import-step">
        <batch:tasklet>
            <batch:chunk reader="myItemReader"
                         writer="myItemWriter"
                         commit-interval="10000">
                <!--
                skip-limit="10000"
                <batch:skippable-exception-classes>
                   <batch:include class="java.lang.Exception"/>
                   <batch:exclude class="java.io.FileNotFoundException"/>
                </batch:skippable-exception-classes> -->
            </batch:chunk>
            <batch:transaction-attributes isolation="DEFAULT" propagation="REQUIRED"/>
        </batch:tasklet>
    </batch:step>
</batch:job>

<bean id="myItemReader" scope="step" class="org.springframework.batch.item.file.FlatFileItemReader">
    <property name="linesToSkip" value="1"/>
    <property name="encoding" value="${my.import.batch.encoding}" />
    <property name="resource" value="${my.import.batch.input.resource}"/>
    <property name="lineMapper">
        <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
            <property name="lineTokenizer" ref="lineTokenizer"/>
            <property name="fieldSetMapper">
                <bean class="com.mydomain.MyImportMapper"/>
            </property>
        </bean>
    </property>
</bean>

<bean id="myItemWriter" class="com.mydomain.MyItemWriter">
    <property name="myApplicationService" ref="defaultmyApplicationService" />
</bean>

<bean id="lineTokenizer" class="com.mydomain.DelimitedLineTokenizerWithEOF">
    <property name="delimiter" value="${my.import.batch.delimiter}" />
    <property name="eofMarker" value="${my.import.batch.eof.marker}" />
</bean>

公共类CopyFileTasklet实现Tasklet{

private MessageChannel requestChannel;

private PollableChannel replyChannel;

public void setRequestChannel(MessageChannel requestChannel) {
    this.requestChannel = requestChannel;
}

public void setReplyChannel(PollableChannel replyChannel) {
    this.replyChannel = replyChannel;
}

@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {

    Message<?> result = replyChannel.receive(10000);
    Object file = result.getPayload();       
    return RepeatStatus.FINISHED;
}

}

共有1个答案

华宇
2023-03-14

您的问题是,您没有从自定义的Tasklet初始化集成流。当然,如果您之前没有发送请求,您将无法从回复频道接收任何内容。

如果您只需要处理集成流并从中获得结果,那么最好使用POJI

public interface SftpGateway {

   File download(String fileName);

}

<gateway id="sftpGateway" service-interface="com.my.proj.SftpGateway"
    default-request-channel="requestChannel"/>


<bean name="copyFileTasklet" class="com.mydomain.CopyFileTasklet">
    <property name="sftpGateway" ref="sftpGateway" />
</bean>

类似的东西。

 类似资料:
  • 我是Spring integration SFTP的新手。现在,我想从多个目录下载文件。那么SFTP出站网关似乎是我的首选,但我只找到了使用XML config的示例。如何使用Java config来实现这一点呢?我的配置类: 但当我启动应用程序时,什么也没发生,哪里出了问题? ---更新---我的测试类

  • 我正在实现一个tcp客户端,它将向服务器发送请求并期望响应。但是当客户端第一次连接到服务器时,服务器会发送一个hello msg,通过tcp出站网关,它期望发送然后接收。我怎么做接收(只有当第一次连接)然后做我通常的发送和接收逻辑?如果我需要保持连接存活,有没有办法通过某个调度任务将keep alive msg发送到相同的连接?

  • 我的FlatFileItemWriter回调有一个奇怪的问题。我有一个自定义ItemWriter实现FlatFileFolterCallback和FlatFileHeaderCallback。因此,我在我的FlatFileItemWriter中设置页眉和页脚回调如下: ItemWriter Bean FlatFileItemWriter Bean 步进豆 我的writeFooter、writeHe

  • https:// /gateway/services/integration/api/getrecordsfromservice1?transactionid=1111111111&id=100100100100157 integration是为IntegrationService在网关中注册的服务名称。类似地,service1是用于service1的。 我不能理解的是: null 我的入站和出站

  • 我有spring批处理设置(远程分区),它从文件中读取项目并处理它们。 null 我指的是这个测试用例 https://github.com/spring-projects/spring-integration/blob/master/spring-integration-sftp/src/test/Java/org/springframework/integration/sftp/outboun

  • 我有一个http出站网关,它将json消息发布到rest服务,现在rest将以json消息类型的http错误状态响应,以防我们的应用程序捕获任何错误。在将json消息发布到rest服务并获得http成功状态的愉快场景中,我们的应用程序也应该捕获json消息。 现在我通过Spring集成实现了什么,我能够发送消息并获得成功响应并捕获它,但在错误状态下Spring行为会抛出异常,我如何更改行为? 如果