当前位置: 首页 > 知识库问答 >
问题:

Spring批处理和Spring集成的集成问题-“未为endpoint定义轮询器”异常

钱雅逸
2023-03-14
<bean id="sftpSessionFactory" class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
        <property name="host" value="${host}"/>
        <property name="port" value="${port}"/>
        <property name="user" value="${user}"/>
        <property name="password" value="${password}"/>
    </bean>

    <int:channel id="inboundFileChannel"><int:queue/></int:channel>
    <int:channel id="outboundJobRequestChannel"/>
    <int:channel id="jobLaunchReplyChannel"/>

    <int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
            channel="inboundFileChannel"
            session-factory="sftpSessionFactory"
            local-directory="file:/chofac/data/mex/registry/outbox"
            remote-directory="/chofac/SFTP/MEX/outbox"
            auto-create-local-directory="true"
            delete-remote-files="false"
            filename-pattern="*.txt">
        <int:poller max-messages-per-poll="-1" fixed-rate="1000" />
    </int-sftp:inbound-channel-adapter>

    <int:transformer input-channel="inboundFileChannel"
        output-channel="outboundJobRequestChannel">
        <bean class="com.chofac.mint.integration.FileMessageToJobRequest">
            <property name="job" ref="responseFileReaderJob"/>
            <!-- <property name="fileParameterName" value="input.file.name"/> -->
        </bean>
    </int:transformer>

    <batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
        reply-channel="jobLaunchReplyChannel" job-launcher="jobLauncher">
        <int:poller fixed-rate="1000"/>
    </batch-int:job-launching-gateway>

    <int:logging-channel-adapter channel="jobLaunchReplyChannel"/>

    <batch:job id="responseFileReaderJob">
        <batch:step id="dailyReaderJob">
            <batch:tasklet>
                <batch:chunk reader="dailyRRReader" writer="dailyRRDBWriter" processor="itemProcessor" commit-interval="10"/>
            </batch:tasklet>
        </batch:step>
    </batch:job>

我正在使用下面的测试用例运行这个程序:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath:META-INF/spring/applicationContext.xml","classpath:META-INF/spring/inbound-ResponseReaderJobIntegration.xml"})
public class AAASftpInboundMsgJobTriggerTest {

    @Resource(name="inboundFileChannel")
    PollableChannel localFileChannel;

    @Test
    public void runDemo(){
        System.out.println("Received first file message: " + localFileChannel.receive());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

我得到这个错误:

当我输入这个问题时,我尝试了谷歌关于这个问题的大多数顶级答案和StachOverflow的建议,所有这些都导致了不同的其他错误,我似乎从主要问题上转移了注意力。

最常见的建议是添加一个全局轮询器,但这会导致以下错误:

<int:poller default="true" fixed-delay="50"/>

原因:java.lang.IllegalArgumentException:不应为终结点“org.SpringFramework.Integration.config.ConsumerEndpointFactoryBean#1”指定轮询器,因为“outbound JobRequestChannel”是SubscribableChannel(不可轮询)。

(我是所有这些方面的新手,Spring、Spring批处理和Spring集成)任何帮助都将非常感谢。提前道谢。

更新1

我删除了#2中的投票器,如下所示

<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
    reply-channel="jobLaunchReplyChannel" job-launcher="jobLauncher">
</batch-int:job-launching-gateway>

并删除了全局轮询器

<int:poller default="true" fixed-delay="50"/> 
<int:poller default="true" fixed-delay="50"/>

如果删除以下异常

Exception in thread "main" org.springframework.beans.factory.BeanNotOfRequiredTypeException: Bean named 'inboundFileChannel' must be of type [org.springframework.messaging.PollableChannel], but was actually of type [org.springframework.integration.channel.DirectChannel]  at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:376)    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200)  at org.springframework.context.support.AbstractApplicationContext.getBean(AbstractApplicationContext.java:979)  at com.chofac.mint.batchintegration.SftpInboundMsgJobTriggerMain.main(SftpInboundMsgJobTriggerMain.java:16)

下面是我的配置:

<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
        channel="inboundFileChannel"
        session-factory="sftpSessionFactory"
        local-directory="file:${inbound.local.directory}"
        remote-directory="${inbound.remote.directory}"
        auto-create-local-directory="true"
        delete-remote-files="false"
        filename-pattern="*.*">
    <int:poller max-messages-per-poll="-1" fixed-rate="45000" />
    <!-- <int:poller max-messages-per-poll="-1" cron="${MEX.CRON.PATTERN}"/> -->
    <!-- 0 15 10 ? * MON-FRI -->
</int-sftp:inbound-channel-adapter>

<int:channel id="inboundFileChannel"></int:channel>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel"/>

<!-- <int:poller default="true" cron="${MEX.CRON.PATTERN}"/> -->

<int:transformer input-channel="inboundFileChannel"
    output-channel="outboundJobRequestChannel">
    <bean class="com.chofac.mint.integration.FileMessageToJobRequest">
        <property name="job" ref="responseFileReaderJob"/>
    </bean>
</int:transformer>

<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
    reply-channel="jobLaunchReplyChannel" job-launcher="jobLauncher">
</batch-int:job-launching-gateway>

<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>

我从主程序触发此操作,如下所示:

ApplicationContext context = new ClassPathXmlApplicationContext("classpath:META-INF/spring/applicationContext.xml","classpath:META-INF/spring/batchintegration/inboundSFTPJob.xml");
PollableChannel pollableFileChannel = context.getBean("inboundFileChannel", PollableChannel.class);
System.out.println("Received first file message: " + pollableFileChannel.receive());
<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
        channel="inboundFileChannel"
        session-factory="sftpSessionFactory"
        local-directory="file:${inbound.local.directory}"
        remote-directory="${inbound.remote.directory}"
        auto-create-local-directory="true"
        delete-remote-files="false" 
        filename-pattern ="*.*"
        local-filter="acceptAllFileListFilter">
    <int:poller max-messages-per-poll="-1" fixed-rate="45000" />

<bean id="acceptAllFileListFilter" class="org.springframework.integration.file.filters.AcceptAllFileListFilter"/>

程序运行到一个有异常的连续循环中。我觉得轮询发生得非常频繁(每秒),尽管我在这里为轮询器的完整配置和日志指定了45秒。

共有1个答案

魏凡
2023-03-14

OutboundJobRequestChannel是一个SubscribableChannel,因此在batch-int:job-launking-gateway上不能有 InboundFileChannel是一个QueueChannel,因此它的使用者需要一个轮询器(转换器)。

请注意bean名称中的#0#1int-sftp:inbound-channel-adapter(#0)正确地具有轮询器;把另一个拿掉就行了。

对于这种情况,您并不真正需要全局(默认)轮询器(但它正在用于您的transformer)。

编辑:

(答复对你问题的评论)。

这是默认行为。默认情况下,local-filterAcceptonCEFilelistFilter。如果在下一轮投票之前删除该文件,则可以将其更改为AcceptAllFileListFilter

持久筛选器使用元数据存储并使用文件名和lastmodified时间戳来确定筛选器是否应该接受(传递)该文件。

编辑2:

该示例只是将文件转储到队列通道中,并在中接收;您已经订购了变压器。

PollableChannel pollableFileChannel = context.getBean("inboundFileChannel", PollableChannel.class);
System.out.println("Received first file message: " + pollableFileChannel.receive());
 类似资料:
  • 我阅读了Spring集成指南和这里的示例,获得了Spring集成SFTP程序的工作示例。我已经有了一个可以工作的Spring批处理程序,它可以读取大量文件并转储到数据库中。 我现在正试图通过查看Spring文档来集成Spring批处理和Spring集成程序,我创建了以下配置。 我使用下面的测试用例运行这个程序: 我得到了这个错误: 在我输入这个问题时,我尝试了谷歌关于这个问题的大多数顶级答案和St

  • 我们有一个作业,它使用cron表达式在下午1点到5点之间每隔M-F轮询一次文件和数据库。在此期间,如果文件到达,它将下载文件并调用作业。这很好,我们使用了spring集成和批处理。 现在,我们需要一些定制,其中我们有多个作业,其中job1应该像上面一样轮询。一旦文件处理成功,它应该停止轮询。 第二个要求是,若在轮询期间并没有收到文件,我们希望向ops团队发送一些通知,以便他们可以采取一些行动。

  • 如何使用java dsl Integrationflows从spring集成触发spring批处理作业。 我有下面的代码,它轮询目录中的文件,当新文件添加到目录中时,会生成一条消息,我想在该实例中触发一个Spring批处理作业。请建议。

  • 我正在将Spring Boot项目与Spring批处理和数据jpa项目集成。所有与作业和数据配置相关的东西都是正确的,除了将我的作业编写器结果保存在数据库中。在我读取文件并对其进行处理后,我无法将其写入mysql数据库。没有错误,但也没有插入。有趣的是我的数据源已配置。因为在插入之前,我可以从数据库中获取示例记录。请协助我解决这个问题。 我的申请。属性: 批次配置: 道类: 作家类: temPer

  • 如果spring集成webflux流中发生异常,则异常本身(带有stacktrace)通过MessagePublishingErrorHandler作为有效负载发送回调用方,该处理器使用来自“errorChannel”头的错误通道,而不是默认错误通道。 如何设置类似于WebExceptionHandler的错误处理程序?我想生成一个Http状态代码,并可能生成一个DefaultErrorAttri

  • 我希望能够使用在作业配置中定义的以相同的参数(基本上是相同的文件)重新启动作业(可能是因为应用程序已经重新启动,或者由于某些原因我们再次收到了相同的文件)。 不幸的是,run.id=1没有增加,我得到一个 作业配置 多谢