当前位置: 首页 > 知识库问答 >
问题:

Spring批处理和Spring集成。无法配置JobListener

卫嘉谊
2023-03-14

我是Spring的新人。最近我试着让Spring批处理和Spring集成一起工作。我想有一个JobListener,它会监听消息到达特定的频道并启动Spring批处理作业。

我在github上找到了一个例子(https://github.com/chrisjs/spring-batch-scaling/tree/master/message-job-launch)我试图以某种方式将Spring批处理和Spring集成复制到一起,如下所示:

<!--Incomming channel OneToOne-->
<int:channel id="requests-channel"/>

<!--For multiple consumers OneToMany-->
<int:publish-subscribe-channel id="reply-channel"/>

<!--Channel for file adapter-->
<int:channel id="file-adapter-reply-channel"/>

<int:channel id="statuses">
    <int:queue capacity="10"/>
</int:channel>


<int:channel id="jobLaunchReplyChannel"/>


<!--Intercept request-->
<int-http:inbound-gateway request-channel="requests-channel"
                          supported-methods="PUT"
                          path="/testData/setProfileDescription"
                          reply-timeout="30000"
                          reply-channel="reply-channel">
</int-http:inbound-gateway>


<!--Sending HTTP response back to user OR either 'no reply received within timeout'-->
<bean id="profileDescriptionActivator"
      class="ru.tcsbank.service.integrations.activators.ProfileDescriptionActivator"/>

<int:service-activator ref="profileDescriptionActivator"
                       input-channel="requests-channel"
                       output-channel="reply-channel"
                       method="httpMessageActivator"/>


<!--Write profile description to file-->
<bean id="custom-file-name-generator"
      class="ru.tcsbank.service.integrations.transformers_generators.ProfilesFileAdapterNameGenerator"/>
<file:outbound-channel-adapter channel="file-adapter-reply-channel"
                               directory="file:out"
                               filename-generator="custom-file-name-generator"/>


<bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource" lazy-init="true" destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" value="jdbc:mysql://localhost:3306/testdb"/>
    <property name="username" value="test_user"/>
    <property name="password" value="qwerty123"/>
</bean>


<bean id="stepScope" class="org.springframework.batch.core.scope.StepScope">
    <property name="autoProxy" value="true"/>
</bean>

<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
    <property name="dataSource" ref="dataSource"/>
</bean>

<bean id="jobRepositoryInDB" class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean">
    <property name="dataSource" ref="dataSource"/>
    <property name="transactionManager" ref="transactionManager"/>
</bean>


<bean id="itemProcessor" class="ru.tcsbank.service.batch_processing.CustomItemProcessor"/>

<bean id="itemReader" class="ru.tcsbank.service.batch_processing.CustomReader" scope="step">
    <property name="resource" value="classpath:fileOut/*.csv" />
    <property name="lineMapper">
        <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
            <property name="lineTokenizer">
                <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
                    <property name="delimiter" value=","/>
                    <property name="names" value="id,firstName,lastName"/>
                </bean>
            </property>
            <property name="fieldSetMapper">
                <bean class="ru.tcsbank.service.batch_processing.ProfileDescriptionLineMapper"/>
            </property>
        </bean>
    </property>
</bean>
<bean id="itemWriter" class="ru.tcsbank.service.batch_processing.CustomWriter"/>

<batch:job id="helloWorldJob" job-repository="jobRepositoryInDB">
    <batch:listeners>
        <batch:listener ref="jobListener"/>
    </batch:listeners>
    <batch:step id="step1">
        <batch:tasklet>
            <batch:chunk reader="itemReader" writer="itemWriter" processor="itemProcessor" commit-interval="10"/>
        </batch:tasklet>
    </batch:step>
</batch:job>


<int:transformer input-channel="reply-channel" output-channel="file-adapter-reply-channel">
    <bean class="ru.tcsbank.service.batch_processing.FileMessageToJobRequest">
        <property name="job" ref="helloWorldJob"/>
        <property name="fileParameterName" value="input.file.name"/>
    </bean>
</int:transformer>


<bean id="jobListener" class="ru.tcsbank.service.batch_processing.CustomJobExecutionListener">
    <constructor-arg index="0" ref="notificationSender"/>
</bean>

<batch-int:job-launching-gateway request-channel="reply-channel"
                                 reply-channel="file-adapter-reply-channel"/>

<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>

<int:channel id="notificationsChannel"/>
<int:gateway id="notificationSender"
             service-interface="ru.tcsbank.service.batch_processing.NotificationSender"
             default-request-channel="notificationsChannel"/>

我希望我的helloWorldJob在我的jobListener收到来自notificationsAnnel消息时运行。但它不起作用(不接收来自NotificationsAnnel的消息),然后抛出如下错误:

Dispatcher没有频道应用程序的订户。“通知”。;嵌套异常是

共有1个答案

严元白
2023-03-14

很难理解您想要用所有这些定制代码实现什么,但我可以说的是,在您的配置中没有订阅该notificationsAnnel。您确实可以通过notificationSender网关向它发送消息,但您不提供任何endpoint来使用notificationsChannel

在您在链接中提到的示例中,我们有如下内容:

<int-jms:outbound-channel-adapter id="notifications" destination-name="notifications"
              channel="notificationsChannel"/>

因此,发送到notificationsAnnel的消息会被放入JMS代理上的notifications队列中。你的样品泄露了这样一个订阅者。因此,我只能解释一个例外的原因,但绝对不能告诉你该怎么做。

使现代化

您不能在解决方案中使用notificationSender。看起来这只是CustomJobExecutionListener的结果。因此,如果您不需要监听作业流程,只需删除该CustomJobExecutionListener,从而删除该notificationSender声明以及NotificationsAnnel定义。

你在评论中提出的所有其他问题都超出了这个问题的范围。请考虑在单独的线程中提高这些关注点。

 类似资料:
  • Spring Integration Java DSL Reference和Spring Batch Java配置文档说明了如何将Java配置用于Spring Integration和Spring Batch。 但它们没有说明如何为Spring批处理集成配置它。如何使用DSL配置JobLaunchingGateway? 干杯,曼诺

  • 我正在将Spring Boot项目与Spring批处理和数据jpa项目集成。所有与作业和数据配置相关的东西都是正确的,除了将我的作业编写器结果保存在数据库中。在我读取文件并对其进行处理后,我无法将其写入mysql数据库。没有错误,但也没有插入。有趣的是我的数据源已配置。因为在插入之前,我可以从数据库中获取示例记录。请协助我解决这个问题。 我的申请。属性: 批次配置: 道类: 作家类: temPer

  • 我希望能够使用在作业配置中定义的以相同的参数(基本上是相同的文件)重新启动作业(可能是因为应用程序已经重新启动,或者由于某些原因我们再次收到了相同的文件)。 不幸的是,run.id=1没有增加,我得到一个 作业配置 多谢

  • 我们正在处理一个Spring批处理项目(Spring Boot1.2.2.Release),要求使用Spring SFTP集成以一定频率轮询从服务器位置读取文件。我们使用java config实现了Spring批处理,并在使用Spring Integration java config的过程中实现了Spring批处理。我找不到描述上述情况的例子。我浏览了各种链接,但看到的主要是XML配置示例。 h

  • 我有一个spring批处理应用程序,它从文件中读取数据,进行一些处理,最后编写一个定制的输出。这一切都是一步到位的。在下一步中,我将使用一个tasklet来归档输入文件(移动到另一个文件夹)。这个应用程序运行良好。但是,现在我需要在远程服务器上对sftp输出文件进行进一步处理。我找到了一种使用spring integration实现sftp的方法,在这里我创建了一个输入通道,该通道将反馈给outb

  • 我有下一个spring批处理配置类: 启动应用程序时,我收到下一个异常: