当前位置: 首页 > 知识库问答 >
问题:

Spring批处理和Spring集成的集成问题——“没有为endpoint定义轮询器”异常

羊舌成周
2023-03-14

我阅读了Spring集成指南和这里的示例,获得了Spring集成SFTP程序的工作示例。我已经有了一个可以工作的Spring批处理程序,它可以读取大量文件并转储到数据库中。

我现在正试图通过查看Spring文档来集成Spring批处理和Spring集成程序,我创建了以下配置。

<bean id="sftpSessionFactory" class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
        <property name="host" value="${host}"/>
        <property name="port" value="${port}"/>
        <property name="user" value="${user}"/>
        <property name="password" value="${password}"/>
    </bean>

    <int:channel id="inboundFileChannel"><int:queue/></int:channel>
    <int:channel id="outboundJobRequestChannel"/>
    <int:channel id="jobLaunchReplyChannel"/>

    <int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
            channel="inboundFileChannel"
            session-factory="sftpSessionFactory"
            local-directory="file:/chofac/data/mex/registry/outbox"
            remote-directory="/chofac/SFTP/MEX/outbox"
            auto-create-local-directory="true"
            delete-remote-files="false"
            filename-pattern="*.txt">
        <int:poller max-messages-per-poll="-1" fixed-rate="1000" />
    </int-sftp:inbound-channel-adapter>

    <int:transformer input-channel="inboundFileChannel"
        output-channel="outboundJobRequestChannel">
        <bean class="com.chofac.mint.integration.FileMessageToJobRequest">
            <property name="job" ref="responseFileReaderJob"/>
            <!-- <property name="fileParameterName" value="input.file.name"/> -->
        </bean>
    </int:transformer>

    <batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
        reply-channel="jobLaunchReplyChannel" job-launcher="jobLauncher">
        <int:poller fixed-rate="1000"/>
    </batch-int:job-launching-gateway>

    <int:logging-channel-adapter channel="jobLaunchReplyChannel"/>

    <batch:job id="responseFileReaderJob">
        <batch:step id="dailyReaderJob">
            <batch:tasklet>
                <batch:chunk reader="dailyRRReader" writer="dailyRRDBWriter" processor="itemProcessor" commit-interval="10"/>
            </batch:tasklet>
        </batch:step>
    </batch:job>

我使用下面的测试用例运行这个程序:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath:META-INF/spring/applicationContext.xml","classpath:META-INF/spring/inbound-ResponseReaderJobIntegration.xml"})
public class AAASftpInboundMsgJobTriggerTest {

    @Resource(name="inboundFileChannel")
    PollableChannel localFileChannel;

    @Test
    public void runDemo(){
        System.out.println("Received first file message: " + localFileChannel.receive());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

我得到了这个错误:

在我输入这个问题时,我尝试了谷歌关于这个问题的大多数顶级答案和StachOverflow的建议,所有这些都导致了其他不同的错误,我似乎偏离了主要问题。

最常见的建议是添加一个全球民意调查,但这导致了以下错误:

<int:poller default="true" fixed-delay="50"/>

原因:java。lang.IllegalArgumentException:不应为endpoint的组织指定轮询器。springframework。整合。配置。ConsumerEndpointFactoryBean#1’,因为“outboundJobRequestChannel”是SubscribableChannel(不可轮询)。

(我在所有这些方面都是新手,Spring,Spring批处理和Spring集成)任何帮助都将不胜感激。事先谢谢。

更新1

我删除了#2中的轮询器,如下所示

<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
    reply-channel="jobLaunchReplyChannel" job-launcher="jobLauncher">
</batch-int:job-launching-gateway>

并删除了全球民意调查

<int:poller default="true" fixed-delay="50"/> 

我得到以下例外:

线程“main”组织中出现异常。springframework。豆。工厂BeanCreationException:创建名为“org”的bean时出错。springframework。整合。配置。ConsumerEndpointFactoryBean#0”:调用init方法失败;嵌套的例外是java。lang.IllegalArgumentException:尚未为endpoint的组织定义轮询器。springframework。整合。配置。ConsumerEndpointFactoryBean#0',上下文中没有可用的默认轮询器。在org。springframework。豆。工厂支持AbstractAutoWireAbleBeanFactory。org上的initializeBean(AbstractAutowireCapableBeanFactory.java:1553)。springframework。豆。工厂支持AbstractAutoWireAbleBeanFactory。org上的doCreateBean(AbstractAutowireCapableBeanFactory.java:539)。springframework。豆。工厂支持AbstractAutoWireAbleBeanFactory。在org上创建Bean(AbstractAutowireCapableBeanFactory.java:475)。springframework。豆。工厂支持AbstractBeanFactory 1美元。org上的getObject(AbstractBeanFactory.java:304)。springframework。豆。工厂支持DefaultSingletonBeanRegistry。getSingleton(DefaultSingletonBeanRegistry.java:228)位于org。springframework。豆。工厂支持抽象工厂。org上的doGetBean(AbstractBeanFactory.java:300)。springframework。豆。工厂支持抽象工厂。org上的getBean(AbstractBeanFactory.java:195)。springframework。豆。工厂支持DefaultListableBeanFactory。org上的预实例化单例(DefaultListableBeanFactory.java:681)。springframework。上下文支持AbstractApplicationContext。在org上完成BeanFactoryInitialization(AbstractApplicationContext.java:760)。springframework。上下文支持AbstractApplicationContext。在org上刷新(AbstractApplicationContext.java:482)。springframework。上下文支持ClassPathXmlApplicationContext。(ClassPathXmlApplicationContext.java:139)网址:org。springframework。上下文支持ClassPathXmlApplicationContext。(ClassPathXmlApplicationContext.java:93)在com上。乔法克。造币厂整合。下载文件runbatch。main(DownloadFileRunBatch.java:15)由:java引起。lang.IllegalArgumentException:尚未为endpoint的组织定义轮询器。springframework。整合。配置。ConsumerEndpointFactoryBean#0',上下文中没有可用的默认轮询器。在org。springframework。util。明确肯定org上的notNull(Assert.java:112)。springframework。整合。配置。ConsumerdPointFactoryBean。在org上初始化EndPoint(ConsumerEndpointFactoryBean.java:238)。springframework。整合。配置。ConsumerdPointFactoryBean。AfterPropertieSet(ConsumerEndpointFactoryBean.java:187)位于org。springframework。豆。工厂支持AbstractAutoWireAbleBeanFactory。org上的invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1612)。springframework。豆。工厂支持AbstractAutoWireAbleBeanFactory。initializeBean(AbstractAutowireCapableBeanFactory.java:1549)。。。还有12个

然而,如果我离开全局轮询器,SFTP传输将发生,作业将被触发

<int:poller default="true" fixed-delay="50"/>

更新二

如果我删除

Exception in thread "main" org.springframework.beans.factory.BeanNotOfRequiredTypeException: Bean named 'inboundFileChannel' must be of type [org.springframework.messaging.PollableChannel], but was actually of type [org.springframework.integration.channel.DirectChannel]  at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:376)    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200)  at org.springframework.context.support.AbstractApplicationContext.getBean(AbstractApplicationContext.java:979)  at com.chofac.mint.batchintegration.SftpInboundMsgJobTriggerMain.main(SftpInboundMsgJobTriggerMain.java:16)

下面是我的配置:

<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
        channel="inboundFileChannel"
        session-factory="sftpSessionFactory"
        local-directory="file:${inbound.local.directory}"
        remote-directory="${inbound.remote.directory}"
        auto-create-local-directory="true"
        delete-remote-files="false"
        filename-pattern="*.*">
    <int:poller max-messages-per-poll="-1" fixed-rate="45000" />
    <!-- <int:poller max-messages-per-poll="-1" cron="${MEX.CRON.PATTERN}"/> -->
    <!-- 0 15 10 ? * MON-FRI -->
</int-sftp:inbound-channel-adapter>

<int:channel id="inboundFileChannel"></int:channel>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel"/>

<!-- <int:poller default="true" cron="${MEX.CRON.PATTERN}"/> -->

<int:transformer input-channel="inboundFileChannel"
    output-channel="outboundJobRequestChannel">
    <bean class="com.chofac.mint.integration.FileMessageToJobRequest">
        <property name="job" ref="responseFileReaderJob"/>
    </bean>
</int:transformer>

<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
    reply-channel="jobLaunchReplyChannel" job-launcher="jobLauncher">
</batch-int:job-launching-gateway>

<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>

我从一个主程序触发如下:

ApplicationContext context = new ClassPathXmlApplicationContext("classpath:META-INF/spring/applicationContext.xml","classpath:META-INF/spring/batchintegration/inboundSFTPJob.xml");
PollableChannel pollableFileChannel = context.getBean("inboundFileChannel", PollableChannel.class);
System.out.println("Received first file message: " + pollableFileChannel.receive());

更新三

web上的示例配置、JUnit和Spring示例。

更新4

<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
        channel="inboundFileChannel"
        session-factory="sftpSessionFactory"
        local-directory="file:${inbound.local.directory}"
        remote-directory="${inbound.remote.directory}"
        auto-create-local-directory="true"
        delete-remote-files="false" 
        filename-pattern ="*.*"
        local-filter="acceptAllFileListFilter">
    <int:poller max-messages-per-poll="-1" fixed-rate="45000" />

<bean id="acceptAllFileListFilter" class="org.springframework.integration.file.filters.AcceptAllFileListFilter"/>

程序运行到一个有异常的连续循环中。我觉得轮询非常频繁(每秒),尽管我在这里为轮询器完整配置和日志指定了45秒。

共有1个答案

袁鸿畅
2023-03-14

outboundJobRequestChannel是一个SubscribableChannel,因此您不能拥有

注意bean名称中的#0#1int-sftp:inbound channel adapter#0)正确地具有轮询器;只需移除另一个。

在这种情况下,您并不真正需要全局(默认)轮询器(但它正在用于变压器)。

编辑:

(回复对您问题的评论)。

这是默认行为。默认情况下,local过滤器是一个AcceptOnceFileListFilter。如果在下次轮询之前删除该文件,则可以将其更改为AcceptAllFileListFilter

如果要将文件保留在磁盘上,但检测到它已被更改,请使用filesystemtempersistanceptoncefileListFilter;由于不删除远程文件,还应将过滤器设置为sftpersistentAcceptonFileListFilter(实际上是包装此过滤器和其中一个模式过滤器的CompositeFileListFilter)。

否则,您将继续获取相同的文件,并且,如果没有preseve-time戳,本地过滤器每次都会认为这是一个新文件。

持久过滤器使用元数据存储,并使用文件名和lastModified时间戳来确定过滤器是否应该接受(传递)该文件。

编辑2:

该示例只是将文件转储到队列通道中,并在main中接收;您订阅了一个变压器。

是这个密码。。。

PollableChannel pollableFileChannel = context.getBean("inboundFileChannel", PollableChannel.class);
System.out.println("Received first file message: " + pollableFileChannel.receive());

这会给你带来当前的悲伤——只需从你的主屏幕上删除这些线条。

 类似资料:
  • 我正在使用下面的测试用例运行这个程序: 我得到这个错误: 当我输入这个问题时,我尝试了谷歌关于这个问题的大多数顶级答案和StachOverflow的建议,所有这些都导致了不同的其他错误,我似乎从主要问题上转移了注意力。 最常见的建议是添加一个全局轮询器,但这会导致以下错误: 原因:java.lang.IllegalArgumentException:不应为终结点“org.SpringFramewo

  • 我们有一个作业,它使用cron表达式在下午1点到5点之间每隔M-F轮询一次文件和数据库。在此期间,如果文件到达,它将下载文件并调用作业。这很好,我们使用了spring集成和批处理。 现在,我们需要一些定制,其中我们有多个作业,其中job1应该像上面一样轮询。一旦文件处理成功,它应该停止轮询。 第二个要求是,若在轮询期间并没有收到文件,我们希望向ops团队发送一些通知,以便他们可以采取一些行动。

  • 如何使用java dsl Integrationflows从spring集成触发spring批处理作业。 我有下面的代码,它轮询目录中的文件,当新文件添加到目录中时,会生成一条消息,我想在该实例中触发一个Spring批处理作业。请建议。

  • 我正在将Spring Boot项目与Spring批处理和数据jpa项目集成。所有与作业和数据配置相关的东西都是正确的,除了将我的作业编写器结果保存在数据库中。在我读取文件并对其进行处理后,我无法将其写入mysql数据库。没有错误,但也没有插入。有趣的是我的数据源已配置。因为在插入之前,我可以从数据库中获取示例记录。请协助我解决这个问题。 我的申请。属性: 批次配置: 道类: 作家类: temPer

  • 如果spring集成webflux流中发生异常,则异常本身(带有stacktrace)通过MessagePublishingErrorHandler作为有效负载发送回调用方,该处理器使用来自“errorChannel”头的错误通道,而不是默认错误通道。 如何设置类似于WebExceptionHandler的错误处理程序?我想生成一个Http状态代码,并可能生成一个DefaultErrorAttri

  • 我正在尝试将BeanIO与spring Batch集成。使用BeanIO,我正在读取一个固定长度的流文件。我已经测试并验证了使用独立类读取平面文件的代码,它可以无缝地工作,但是当我试图将它与Spring Batch集成时,BeanIOFlatFileItemReader的doRead()方法没有被调用,而且我编写的RedemptionEventCustomProcessor是如何直接被调用的。 我